You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/08 18:04:00 UTC

svn commit: r1348133 - in /activemq/activemq-apollo/trunk/apollo-network/src: main/scala/org/apache/activemq/apollo/broker/network/ main/scala/org/apache/activemq/apollo/broker/network/dto/ test/scala/org/apache/activemq/apollo/broker/network/

Author: chirino
Date: Fri Jun  8 16:04:00 2012
New Revision: 1348133

URL: http://svn.apache.org/viewvc?rev=1348133&view=rev
Log:
Network bridging now working with a large load of messages.

Modified:
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala?rev=1348133&r1=1348132&r2=1348133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala Fri Jun  8 16:04:00 2012
@@ -39,16 +39,15 @@ trait BrokerLoadListener {
   def on_load_change(broker_load:LoadStatusDTO)
 }
 object RestLoadMonitor extends Log
-class RestLoadMonitor extends BaseService with BrokerLoadMonitor {
+class RestLoadMonitor(manager:NetworkManager) extends BaseService with BrokerLoadMonitor {
   import collection.JavaConversions._
   import RestLoadMonitor._
   
   val dispatch_queue = createQueue("rest load monitor")
   val members = HashMap[String, LoadMonitor]()
-  var poll_interval = 5*1000;
 
   protected def _start(on_completed: Task) = {
-    schedule_reoccurring(1, SECONDS) {
+    schedule_reoccurring(manager.monitoring_interval, SECONDS) {
       for(monitor <- members.values) {
         monitor.poll
       }
@@ -110,7 +109,9 @@ class RestLoadMonitor extends BaseServic
     dispatch_queue {
       for(service <- member.services) {
         if( service.kind == "web_admin" ) {
-          members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+          var monitor = LoadMonitor(member.id, new URL(service.address))
+          members.put(member.id, monitor)
+          monitor.poll
         }
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala?rev=1348133&r1=1348132&r2=1348133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala Fri Jun  8 16:04:00 2012
@@ -63,7 +63,7 @@ class BrokerMetrics() {
       dest_load.message_size = dest.message_size
 
       // Lets not include the network consumers in the the consumer rates..
-      val consumers = dest.consumers.filter(_.user == network_user).toArray
+      val consumers = dest.consumers.filter(_.user != network_user).toArray
 
       dest_load.consumer_count = consumers.size
       dest_load.dequeue_size_rate = 0

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1348133&r1=1348132&r2=1348133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala Fri Jun  8 16:04:00 2012
@@ -56,6 +56,10 @@ class NetworkManager(broker: Broker) ext
   var metrics_map = HashMap[String, BrokerMetrics]()
   val bridges = HashMap[BridgeInfo, BridgeDeployer]()
 
+  def network_user = Option(config.user).getOrElse("network")
+  def network_password = config.password
+  def monitoring_interval = OptionSupport(config.monitoring_interval).getOrElse(5)
+
   protected def _start(on_completed: Task) = {
     import collection.JavaConversions._
 
@@ -65,7 +69,7 @@ class NetworkManager(broker: Broker) ext
     membership_monitor.listener = this
     membership_monitor.start(NOOP)
 
-    load_monitor = new RestLoadMonitor
+    load_monitor = new RestLoadMonitor(this)
     load_monitor.listener = this
     load_monitor.start(NOOP)
 
@@ -93,7 +97,7 @@ class NetworkManager(broker: Broker) ext
   }
 
   def on_load_change(dto: LoadStatusDTO) = dispatch_queue {
-    metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, config.user)
+    metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, network_user)
   }
 
   def load_analysis = {

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala?rev=1348133&r1=1348132&r2=1348133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala Fri Jun  8 16:04:00 2012
@@ -40,20 +40,22 @@ class StompBridgingStrategy(val manager:
 
   val bridges = HashMap[(String, String), Bridge]()
 
-  def bridge_user = manager.config.user
-  def bridge_password = manager.config.password
+  def network_user = manager.network_user
+  def network_password = manager.network_password
 
-  def deploy(info:BridgeInfo) = {
+  def deploy(bridge_info:BridgeInfo) = {
     dispatch_queue.assertExecuting()
-    val bridge = bridges.getOrElseUpdate((info.from, info.to), new Bridge(info.from, info.to))
-    bridge.deploy(info.kind, info.dest)
+    val bridge = bridges.getOrElseUpdate((bridge_info.from, bridge_info.to), new Bridge(bridge_info.from, bridge_info.to))
+    info("Deploying bridge for destination %s, from %s to %s", bridge_info.dest, bridge_info.from, bridge_info.to)
+    bridge.deploy(bridge_info.kind, bridge_info.dest)
   }
 
 
-  def undeploy(info:BridgeInfo) = {
+  def undeploy(bridge_info:BridgeInfo) = {
     dispatch_queue.assertExecuting()
-    for( bridge <- bridges.get((info.from, info.to)) ) {
-      bridge.undeploy(info.kind, info.dest)
+    for( bridge <- bridges.get((bridge_info.from, bridge_info.to)) ) {
+      info("Undeploying bridge for destination %s, from %s to %s", bridge_info.dest, bridge_info.from, bridge_info.to)
+      bridge.undeploy(bridge_info.kind, bridge_info.dest)
     }
   }
 
@@ -69,7 +71,8 @@ class StompBridgingStrategy(val manager:
         case MESSAGE =>
           // forward it..
           frame.action(SEND)
-          println("forwarding message: "+frame.getHeader(MESSAGE_ID))
+          var msgid = frame.getHeader(MESSAGE_ID)
+          debug("forwarding message: %s", msgid)
           to_connection.send(frame, ()=>{
             // Ack it if the original connection is still up...
             // TODO: if it's not a we will probably get a dup/redelivery.
@@ -77,13 +80,13 @@ class StompBridgingStrategy(val manager:
             if( from_connection.state eq original_state ) {
               val ack = new StompFrame(ACK);
               ack.addHeader(SUBSCRIPTION, frame.getHeader(SUBSCRIPTION))
-              ack.addHeader(MESSAGE_ID, frame.getHeader(MESSAGE_ID))
+              ack.addHeader(MESSAGE_ID, msgid)
               from_connection.send(ack, null)
-              println("forwarded message, now acking: "+frame.getHeader(MESSAGE_ID))
+              debug("forwarded message, now acking: %s", msgid)
             }
           })
         case _ =>
-          println("unhandled stomp frame: "+frame)
+          println("unhandled stomp frame: %s", frame)
       }
     }
 
@@ -128,8 +131,8 @@ class StompBridgingStrategy(val manager:
           val to_stomp = new Stomp()
           to_stomp.setDispatchQueue(dispatch_queue)
           to_stomp.setRemoteURI(uri)
-          to_stomp.setLogin(bridge_user)
-          to_stomp.setPasscode(bridge_password)
+          to_stomp.setLogin(network_user)
+          to_stomp.setPasscode(network_password)
           to_stomp.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL)
           val headers = new Properties()
           headers.put("client-type", "apollo-bridge")
@@ -175,17 +178,19 @@ class StompBridgingStrategy(val manager:
           // Reconnect any subscriptions.
           subscriptions.keySet.foreach(subscribe(_))
           // Re-send messages..
-          pending_sends.values.foreach(x => do_send(x._1, x._2))
+          pending_sends.values.foreach(x => request(x._1, x._2))
 
         }
 
-        def do_send(frame:StompFrame, on_complete: ()=>Unit) = {
+        def request(frame:StompFrame, on_complete: ()=>Unit) = {
           connection.request(frame, new org.fusesource.stomp.client.Callback[StompFrame] {
             override def onSuccess(response: StompFrame) = on_complete()
             override def onFailure(value: Throwable) = failed(value)
           })
         }
 
+        def send(frame:StompFrame) = connection.send(frame, null)
+
         def failed(value: Throwable)= {
           debug("Bridge connection to %s failed due to: ", uri, value)
           close(ReconnectDelayState(1000))
@@ -235,14 +240,18 @@ class StompBridgingStrategy(val manager:
         }
       }
 
-      def send(destination:StompFrame, on_complete: ()=>Unit) = {
-        val id = next_id
-        val cb = ()=>{
-          pending_sends.remove(id)
-          on_complete()
+      def send(frame:StompFrame, on_complete: ()=>Unit) = {
+        if( on_complete!=null ) {
+          val id = next_id
+          val cb = ()=>{
+            pending_sends.remove(id)
+            on_complete()
+          }
+          pending_sends.put(id, (frame, cb))
+          react[ConnectedState] { state => state.request(frame, cb) }
+        } else {
+          react[ConnectedState] { state => state.send(frame) }
         }
-        pending_sends.put(id, (destination, cb))
-        react[ConnectedState] { state => state.do_send(destination, cb) }
       }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java?rev=1348133&r1=1348132&r2=1348133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java Fri Jun  8 16:04:00 2012
@@ -43,6 +43,9 @@ public class NetworkManagerDTO extends C
     @XmlAttribute(name="duplex")
     public Boolean duplex;
 
+    @XmlAttribute(name="monitoring_interval")
+    public Integer monitoring_interval;
+
     @XmlElement(name="member")
     public ArrayList<ClusterMemberDTO> members = new ArrayList<ClusterMemberDTO>();
 }

Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala?rev=1348133&r1=1348132&r2=1348133&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala Fri Jun  8 16:04:00 2012
@@ -19,12 +19,13 @@ package org.apache.activemq.apollo.broke
 
 import org.scalatest.matchers.ShouldMatchers
 import org.scalatest.BeforeAndAfterEach
-import org.apache.activemq.apollo.broker.MultiBrokerTestSupport
 import javax.jms.Session._
 import org.fusesource.stomp.jms.{StompJmsDestination, StompJmsConnectionFactory}
 import collection.mutable.ListBuffer
 import javax.jms.{Message, TextMessage, Connection, ConnectionFactory}
 import java.util.concurrent.TimeUnit._
+import org.apache.activemq.apollo.broker.{Broker, MultiBrokerTestSupport}
+import org.fusesource.hawtdispatch._
 
 class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with BeforeAndAfterEach {
 
@@ -69,17 +70,27 @@ class NetworkTest extends MultiBrokerTes
     case _ => None
   }
 
-  test("forward one message") {
+  test("forward 10000 messages") {
     val connections = create_connections
-    
-    val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE)
-    val p0 = s0.createProducer(test_destination())
-    p0.send(s0.createTextMessage("1"))
+    val message_count = 10000;
+
+    var dest = test_destination()
+    val data = "x" * 1024
+
+    Broker.BLOCKABLE_THREAD_POOL {
+      val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE)
+      val p0 = s0.createProducer(dest)
+      for( i <- 0 until message_count ) {
+        p0.send(s0.createTextMessage(i+":"+data))
+      }
+    }
 
     val s1 = connections(1).createSession(false, AUTO_ACKNOWLEDGE)
-    val c1 = s1.createConsumer(test_destination())
+    val c1 = s1.createConsumer(dest)
     within(30, SECONDS) {
-      text(c1.receive()) should be(Some("1"))
+      for( i <- 0 until message_count ) {
+        text(c1.receive()) should be(Some(i+":"+data))
+      }
     }
   }