You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/07 17:04:25 UTC

svn commit: r1347661 [1/2] - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/test/scala/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-dto/src/main/resources/org...

Author: chirino
Date: Thu Jun  7 15:04:23 2012
New Revision: 1347661

URL: http://svn.apache.org/viewvc?rev=1347661&view=rev
Log:
Adding an optional network manager which handles forwarding of messages between brokers to allow the creation of broker networks/federations.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/
    activemq/activemq-apollo/trunk/apollo-network/pom.xml
    activemq/activemq-apollo/trunk/apollo-network/src/
    activemq/activemq-apollo/trunk/apollo-network/src/main/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java
      - copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/test/
    activemq/activemq-apollo/trunk/apollo-network/src/test/resources/
    activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
    activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
    activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties   (with props)
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/
    activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/StateMachine.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Thu Jun  7 15:04:23 2012
@@ -33,7 +33,7 @@ import management.ManagementFactory
 import org.apache.activemq.apollo.dto._
 import javax.management.ObjectName
 import org.fusesource.hawtdispatch.TaskTracker._
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit._
 import security.SecuredResource.BrokerKind
 import reflect.BeanProperty
 import java.net.InetSocketAddress
@@ -157,6 +157,9 @@ object Broker extends Log {
   val buffer_pools = new BufferPools
 
   def class_loader:ClassLoader = ClassFinder.class_loader
+  
+  @volatile
+  var now = System.currentTimeMillis()
 
   val version = using(getClass().getResourceAsStream("version.txt")) { source=>
     read_text(source).trim
@@ -259,7 +262,7 @@ class Broker() extends BaseService with 
   var web_server:WebServer = _
 
   @volatile
-  var now = System.currentTimeMillis()
+  def now = Broker.now
 
   var config_log:Log = Log(new MemoryLogger(Broker.log))
   var audit_log:Log = Broker
@@ -297,8 +300,12 @@ class Broker() extends BaseService with 
     check_file_limit
 
     BrokerRegistry.add(this)
-    schedule_now_update
-    schedule_virtualhost_maintenance
+    schedule_reoccurring(100, MILLISECONDS) {
+      Broker.now = System.currentTimeMillis
+    }
+    schedule_reoccurring(1, SECONDS) {
+      virtualhost_maintenance
+    }
 
     val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
     apply_update(tracker)
@@ -340,28 +347,17 @@ class Broker() extends BaseService with 
 
   }
 
-  def schedule_now_update:Unit = dispatch_queue.after(100, TimeUnit.MILLISECONDS) {
-    if( service_state.is_starting_or_started ) {
-      now = System.currentTimeMillis
-      schedule_now_update
-    }
-  }
-
-  def schedule_virtualhost_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
-    if( service_state.is_started ) {
-      val active_sessions = connections.values.flatMap(_.session_id).toSet
-
-      virtual_hosts.values.foreach { host=>
-        host.dispatch_queue {
-          if(host.service_state.is_started) {
-            host.router.remove_temp_destinations(active_sessions)
-          }
+  def virtualhost_maintenance = {
+    val active_sessions = connections.values.flatMap(_.session_id).toSet
+    virtual_hosts.values.foreach { host=>
+      host.dispatch_queue {
+        if(host.service_state.is_started) {
+          host.router.remove_temp_destinations(active_sessions)
         }
       }
-
-      schedule_virtualhost_maintenance
     }
   }
+
   protected def init_logs = {
     import OptionSupport._
     // Configure the logging categories...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Jun  7 15:04:23 2012
@@ -308,6 +308,21 @@ class Queue(val router: LocalRouter, val
     rc
   }
 
+  def load_status = {
+    val rc = new DestinationLoadDTO
+    rc.id = this.id
+    rc.message_count = queue_size
+    rc.message_size = queue_items
+    rc.message_count_enqueue_counter = enqueue_item_counter
+    rc.message_size_enqueue_counter = enqueue_size_counter
+    rc.message_count_dequeue_counter = dequeue_item_counter
+    rc.message_size_dequeue_counter = dequeue_size_counter
+//  TODO: expose selector attribute of consumer.
+//    for( consumer <- all_subscriptions.keys ) {
+//      rc.consumer_selectors.add(consumer.selector)
+//    }
+    rc
+  }
   def status(entries:Boolean=false) = {
     val rc = new QueueStatusDTO
     rc.id = this.id

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Thu Jun  7 15:04:23 2012
@@ -43,7 +43,7 @@ class BrokerFunSuiteSupport extends FunS
     broker
   }
 
-  override protected def beforeAll() = {
+  override def beforeAll() = {
     super.beforeAll()
     try {
       broker = createBroker
@@ -54,7 +54,7 @@ class BrokerFunSuiteSupport extends FunS
     }
   }
 
-  override protected def afterAll() = {
+  override def afterAll() = {
     ServiceControl.stop(broker)
     super.afterAll()
   }
@@ -142,3 +142,34 @@ class BrokerFunSuiteSupport extends FunS
   }
 
 }
+
+class MultiBrokerTestSupport extends FunSuiteSupport {
+
+  case class BrokerAdmin(override val broker_config_uri:String) extends BrokerFunSuiteSupport
+
+  def broker_config_uris = Array("xml:classpath:apollo.xml")
+  var admins = Array[BrokerAdmin]()
+
+  override protected def beforeAll() = {
+    super.beforeAll()
+    try {
+      admins = broker_config_uris.map(BrokerAdmin(_))
+      admins.foreach(_.beforeAll)
+    } catch {
+      case e: Throwable => e.printStackTrace
+    }
+  }
+
+  override protected def afterAll() = {
+    for( admin <- admins ) {
+      try {
+        admin.afterAll
+      } catch {
+        case e => debug(e)
+      }
+    }
+    admins = Array()
+    super.afterAll()
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java Thu Jun  7 15:04:23 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * The
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="destination_load")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DestinationLoadDTO {
+
+    @XmlAttribute(name="id")
+    public String id;
+
+    @XmlAttribute(name="message_count")
+    public Long message_count;
+
+    @XmlAttribute(name="message_size")
+    public Long message_size;
+
+    @XmlAttribute(name="message_count_enqueue_counter")
+    public Long message_count_enqueue_counter;
+
+    @XmlAttribute(name="message_size_enqueue_counter")
+    public Long message_size_enqueue_counter;
+
+    @XmlAttribute(name="message_count_dequeue_counter")
+    public Long message_count_dequeue_counter;
+
+    @XmlAttribute(name="message_size_dequeue_counter")
+    public Long message_size_dequeue_counter;
+
+    public HashSet<String> consumer_selectors = new HashSet<String>();
+
+}

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java Thu Jun  7 15:04:23 2012
@@ -14,20 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object CollectionsSupport {
+@XmlRootElement(name="flow_report")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LoadStatusDTO {
+
+    @XmlAttribute(name="id")
+    public String id;
+
+    @XmlAttribute(name="timestamp")
+    public long timestamp;
+
+    @XmlElementRef(name="queue")
+    public ArrayList<DestinationLoadDTO> queues = new ArrayList<DestinationLoadDTO>();
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
-}
\ No newline at end of file
+    @XmlElementRef(name="topic")
+    public ArrayList<DestinationLoadDTO> topics = new ArrayList<DestinationLoadDTO>();
+}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Thu Jun  7 15:04:23 2012
@@ -22,6 +22,7 @@ AggregateDestMetricsDTO
 AuthenticationDTO
 AutoGCServiceDTO
 BrokerDTO
+LoadStatusDTO
 BrokerStatusDTO
 ConnectionStatusDTO
 ConnectorStatusDTO
@@ -30,6 +31,7 @@ CustomServiceDTO
 DataPageDTO
 DestMetricsDTO
 DestinationDTO
+DestinationLoadDTO
 DetectDTO
 DurableSubscriptionDTO
 DurableSubscriptionDestinationDTO

Added: activemq/activemq-apollo/trunk/apollo-network/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/pom.xml?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/pom.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-network/pom.xml Thu Jun  7 15:04:23 2012
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Copyright (C) 2012 FuseSource Corp. All rights reserved.
+    http://fusesource.com
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>apollo-scala</artifactId>
+    <version>99-trunk-SNAPSHOT</version>
+    <relativePath>../apollo-scala</relativePath>
+  </parent>
+
+  <groupId>org.apache.activemq</groupId>
+  <artifactId>apollo-network</artifactId>
+  <version>99-trunk-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>${project.artifactId}</name>
+  <description>Used to create a federated network of Apollo Brokers</description>
+
+  <properties>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-broker</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math</artifactId>
+      <version>2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.stompjms</groupId>
+      <artifactId>stompjms-client</artifactId>
+      <version>1.11</version>
+    </dependency>
+
+    <!-- Testing Dependencies -->
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.1</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>${junit-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-broker</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-util</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-stomp</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-openwire</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-leveldb</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <version>1.0.3</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-bdb</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-web</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty.aggregate</groupId>
+      <artifactId>jetty-all-server</artifactId>
+      <version>${jetty-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet.jsp</groupId>
+      <artifactId>jsp-api</artifactId>
+      <version>2.1</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>always</forkMode>
+          <excludes>
+            <exclude>**/EnqueueRateScenariosTest.*</exclude>
+          </excludes>          
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>  
+</project>

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index Thu Jun  7 15:04:23 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.network.NetworkManagerFactory
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index Thu Jun  7 15:04:23 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.network.dto.Module
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index Thu Jun  7 15:04:23 2012
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+NetworkManagerDTO
+BridgeDTO
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala Thu Jun  7 15:04:23 2012
@@ -14,20 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network
 
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object CollectionsSupport {
+case class BridgeInfo(from:String, to:String, kind:String, dest:String)
+
+trait BridgingStrategy {
+
+  def deploy(info:BridgeInfo)
+  def undeploy(info:BridgeInfo)
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
 }
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala Thu Jun  7 15:04:23 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import org.apache.activemq.apollo.dto.{LoadStatusDTO, JsonCodec}
+import dto.ClusterMemberDTO
+import org.fusesource.hawtdispatch._
+import collection.mutable.HashMap
+import java.net.URL
+import java.util.concurrent.TimeUnit._
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.apollo.util.FileSupport._
+import org.apache.activemq.apollo.util.{Log, StateMachine, BaseService, Service}
+
+/**
+ *
+ */
+trait BrokerLoadMonitor extends Service {
+  var listener:BrokerLoadListener = _
+  def add(member:ClusterMemberDTO)
+  def remove(member:ClusterMemberDTO)
+}
+
+trait BrokerLoadListener {
+  def on_load_change(broker_load:LoadStatusDTO)
+}
+object RestLoadMonitor extends Log
+class RestLoadMonitor extends BaseService with BrokerLoadMonitor {
+  import collection.JavaConversions._
+  import RestLoadMonitor._
+  
+  val dispatch_queue = createQueue("rest load monitor")
+  val members = HashMap[String, LoadMonitor]()
+  var poll_interval = 5*1000;
+
+  protected def _start(on_completed: Task) = {
+    on_completed.run()
+    schedule_reoccurring(1, SECONDS) {
+      for(monitor <- members.values) {
+        monitor.poll
+      }
+    }
+  }
+
+  protected def _stop(on_completed: Task) = {
+    on_completed.run()
+  }
+
+  case class LoadMonitor(id:String, url:URL) extends StateMachine {
+    var next_poll = Broker.now
+
+    protected def init() = IdleState()
+    
+    case class IdleState() extends State {
+      def poll = react {
+        become(PollingState())
+      }
+    }
+    
+    case class PollingState() extends State {
+      override def init() = {
+        val started = Broker.now
+
+        // Switch to the blockable thread poll since accessing the data
+        // is a blocking operation.
+        Broker.BLOCKABLE_THREAD_POOL {
+          try {
+            val dto = using(url.openStream()) {
+              is =>
+                JsonCodec.mapper.readValue(is, classOf[LoadStatusDTO])
+            }
+            dto.id = id
+            dto.timestamp = Broker.now
+            listener.on_load_change(dto)
+          } catch {
+            case e => debug(e, "Failed poll broker load of %s", id)
+          }
+          dispatch_queue {
+            become(IdleState())
+          }
+        }
+      }
+    }
+    
+    def poll = {
+      if ( Broker.now >= next_poll ) {
+        state match {
+          case state:IdleState => state.poll
+          case _ =>
+        }
+      }
+    }
+  }
+
+  def add(member: ClusterMemberDTO) = dispatch_queue {
+    for(service <- member.services) {
+      if( service.kind == "webadmin" ) {
+        members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+      }
+    }
+  }
+
+  def remove(member: ClusterMemberDTO) = dispatch_queue {
+    members.remove(member.id)
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala Thu Jun  7 15:04:23 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import org.apache.activemq.apollo.dto.LoadStatusDTO
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
+import collection.mutable.HashMap
+
+class CounterDrivenRate {
+  var last_value = Long.MaxValue
+  val stats = new DescriptiveStatistics()
+  stats.setWindowSize(10)
+  var mean = 0.0d
+
+  def update(value:Long, duration:Double) = {
+    // is it a counter reset??
+    if( value < last_value ) {
+      last_value = value
+    } else {
+      val increment = value - last_value
+      stats.addValue(increment / duration)
+      mean = stats.getMean
+    }
+  }
+}
+
+class DestinationMetrics {
+  var consumer_count = 0L
+  var message_size = 0L
+  val enqueue_size_rate = new CounterDrivenRate()
+  val dequeue_size_rate = new CounterDrivenRate()
+}
+
+class BrokerMetrics() {
+
+  import collection.JavaConversions._
+  var queue_load = HashMap[String, DestinationMetrics]()
+//  var topic_load = HashMap[String, DestinationMetrics]()
+  var timestamp = System.currentTimeMillis()
+
+  def update(current:LoadStatusDTO) = {
+    val now = System.currentTimeMillis()
+    val duration = (now - timestamp)/1000.0d
+    timestamp = now
+
+    var next_queue_load = HashMap[String, DestinationMetrics]()
+    for( dest <- current.queues ) {
+      val dest_load = queue_load.get(dest.id).getOrElse(new DestinationMetrics())
+      dest_load.message_size = dest.message_size
+      dest_load.enqueue_size_rate.update(dest.message_size_enqueue_counter, duration)
+      dest_load.dequeue_size_rate.update(dest.message_size_dequeue_counter, duration)
+      next_queue_load += dest.id -> dest_load
+    }
+    queue_load = next_queue_load
+
+//    var next_topic_load = HashMap[String, DestinationMetrics]()
+//    for( dest <- current.topics ) {
+//      val dest_load = topic_load.get(dest.id).getOrElse(new DestinationMetrics())
+//      dest_load.message_size = dest.message_size
+//      dest_load.enqueue_size_rate.update(dest.message_size_enqueue_counter, duration)
+//      dest_load.dequeue_size_rate.update(dest.message_size_dequeue_counter, duration)
+//      next_topic_load += dest.id -> dest_load
+//    }
+//    topic_load = next_topic_load
+  }
+}

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala Thu Jun  7 15:04:23 2012
@@ -14,20 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network
 
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object CollectionsSupport {
+import dto.ClusterMemberDTO
+import org.apache.activemq.apollo.util.{BaseService, Service}
+import org.fusesource.hawtdispatch._
+
+trait ClusterMembershipMonitor extends Service {
+  var listener:ClusterMembershipListener = _
+}
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
+trait ClusterMembershipListener {
+  def on_cluster_change(members:Set[ClusterMemberDTO])
+}
+
+case class StaticClusterMembershipMonitor(members:Set[ClusterMemberDTO]) extends BaseService with ClusterMembershipMonitor {
+  val dispatch_queue = createQueue("bridge manager")
+  protected def _start(on_completed: Task) = {
+    dispatch_queue {
+      listener.on_cluster_change(members)
+    }
+    on_completed.run()
+  }
+  protected def _stop(on_completed: Task) = {
+    on_completed.run()
   }
-}
\ No newline at end of file
+}

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala Thu Jun  7 15:04:23 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.broker.network.dto._
+import CollectionsSupport._
+import java.util.concurrent.TimeUnit._
+import collection.mutable.{LinkedHashMap, HashSet, ListBuffer, HashMap}
+import org.apache.activemq.apollo.broker.{Broker, CustomServiceFactory}
+import org.apache.activemq.apollo.dto.{LoadStatusDTO, CustomServiceDTO}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object NetworkManagerFactory extends CustomServiceFactory with Log {
+  def create(broker: Broker, dto: CustomServiceDTO): Service = dto match {
+    case dto:NetworkManagerDTO => 
+      val rc = new NetworkManager(broker)
+      rc.config = dto
+      rc
+    case _ => null
+  }
+}
+
+object NetworkManager extends Log
+
+class NetworkManager(broker: Broker) extends BaseService with ClusterMembershipListener with BrokerLoadListener {
+  import NetworkManager._
+
+  val dispatch_queue = createQueue("bridge manager")
+
+  var config = new NetworkManagerDTO
+  var membership_monitor:ClusterMembershipMonitor = _
+  var members = Set[ClusterMemberDTO]()
+  var members_by_id = HashMap[String, ClusterMemberDTO]()
+  var load_monitor: BrokerLoadMonitor = _
+  var metrics_map = HashMap[String, BrokerMetrics]()
+  val bridges = HashMap[BridgeInfo, BridgeDeployer]()
+
+  protected def _start(on_completed: Task) = {
+    import collection.JavaConversions._
+
+    // TODO: also support dynamic membership discovery..
+    membership_monitor = StaticClusterMembershipMonitor(config.members.toSet)
+
+    membership_monitor.listener = this
+    membership_monitor.start(NOOP)
+
+    load_monitor = new RestLoadMonitor
+    load_monitor.listener = this
+    load_monitor.start(NOOP)
+
+    schedule_reoccurring(1, SECONDS) {
+      load_analysis
+    }
+    on_completed.run()
+  }
+
+  protected def _stop(on_completed: Task) = {
+    membership_monitor.stop(NOOP)
+    on_completed.run()
+  }
+
+  def on_cluster_change(value: Set[ClusterMemberDTO]) = dispatch_queue {
+    val (added, _, removed) = diff(members, value)
+    for( m <- removed ) {
+      load_monitor.remove(m)
+    }
+    for( m <- added ) {
+      load_monitor.add(m)
+    }
+    members = value
+    members_by_id = HashMap(members.toSeq.map(x=> (x.id->x)) : _*)
+  }
+
+  def on_load_change(dto: LoadStatusDTO) = dispatch_queue {
+    metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto)
+  }
+
+  def load_analysis = {
+    dispatch_queue.assertExecuting()
+
+    // Lets remove load entries for members that were removed from the cluster.
+    val keys = metrics_map.keySet
+    val current = members.map(_.id).toSet
+    metrics_map = metrics_map -- (keys -- current)
+
+
+    class DemandStatus {
+      val needs_consumers = ListBuffer[(String,DestinationMetrics)]()
+      val has_consumers = ListBuffer[(String,DestinationMetrics)]()
+    }
+
+    val queue_demand_map = HashMap[String, DemandStatus]()
+
+    for( (broker, broker_load) <- metrics_map) {
+      for( (id, dest) <- broker_load.queue_load ) {
+        val status = queue_demand_map.getOrElseUpdate(id, new DemandStatus)
+        if( can_bridge_from(broker) &&  needs_more_consumers(dest) ) {
+          // The broker needs more consumers to drain the queue..
+          status.needs_consumers += (id->dest)
+        } else {
+          // The broker can drain the queue of other brokers..
+          if( can_bridge_to(broker) && dest.consumer_count > 0 ) {
+            status.has_consumers += (id->dest)
+          }
+        }
+      }
+    }
+
+    val desired_bridges = HashSet[BridgeInfo]()
+    for( (id, demand) <- queue_demand_map ) {
+      for( (to, to_metrics)<- demand.needs_consumers; (from, from_metrics) <-demand.has_consumers ) {
+        // we could get fancy and compare the to_metrics and from_metrics to avoid
+        // setting up bridges that won't make a big difference..
+        desired_bridges += BridgeInfo(from, to, "queue", id)
+      }
+    }
+
+    val (bridges_added, _, bridges_removed) = diff(bridges.keySet, desired_bridges)
+
+    // Stop and remove the bridges that are no longer needed..
+    for( info <- bridges_removed ) {
+      bridges.remove(info).get.undeploy
+    }
+
+    // Create and start the new bridges..
+    for( info <- bridges_added ) {
+      val controller = BridgeDeployer(info)
+      bridges.put(info, controller)
+      controller.deploy
+    }
+
+  }
+
+  var local_broker_id = ""
+  var enable_duplex = false
+
+  def can_bridge_from(broker:String):Boolean = broker==local_broker_id
+  def can_bridge_to(broker:String):Boolean = {
+    if ( broker == local_broker_id) {
+      enable_duplex
+    } else {
+      true
+    }
+  }
+
+  def needs_more_consumers(dest:DestinationMetrics):Boolean = {
+
+    // nothing to drain.. so no need for consumers.
+    if( dest.message_size == 0 && dest.enqueue_size_rate.mean == 0) {
+      return false
+    }
+
+    val drain_rate = dest.dequeue_size_rate.mean - dest.enqueue_size_rate.mean
+    if( drain_rate < 0 ) {
+      // Not draining...
+      return true
+    }
+
+    // Might need a consumer due to being drained too slowly..
+    val drain_eta_in_seconds = dest.message_size / drain_rate
+    return drain_eta_in_seconds > 60
+  }
+
+
+  val bridging_strategies = LinkedHashMap[String, BridgingStrategy]()
+  bridging_strategies.put("stomp", new StompBridgingStrategy(this))
+
+  case class BridgeDeployer(info:BridgeInfo) {
+
+    def to = members_by_id.get(info.to)
+    def from = members_by_id.get(info.from)
+
+    var bridging_strategy:BridgingStrategy = _
+    var bridging_strategy_info : BridgeInfo = _
+
+    def deploy:Unit = {
+      // Lets find a service kind that we can use to bridge...
+      import collection.JavaConversions._
+      for( to <- to ; from <-from ) {
+
+        // bridging_strategies are kept in preferred order
+        for( (service_kind, strategy) <- bridging_strategies ) {
+
+          // Lets look to see if we can use the strategy with services exposed by the broker..
+          for( to_service <- to.services; from_service <- from.services ) {
+            if( to_service.kind==service_kind && to_service.kind==from_service.kind ) {
+              bridging_strategy = strategy
+              bridging_strategy_info = BridgeInfo(from_service.address, to_service.address, info.kind, info.dest)
+              bridging_strategy.deploy( bridging_strategy_info )
+            }
+          }
+        }
+      }
+    }
+
+    def undeploy = {
+      if( bridging_strategy!=null ) {
+        bridging_strategy.undeploy(bridging_strategy_info)
+        bridging_strategy = null
+        bridging_strategy_info = null
+      }
+    }
+  }
+
+}
+

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala Thu Jun  7 15:04:23 2012
@@ -0,0 +1,226 @@
+package org.apache.activemq.apollo.broker.network
+
+import org.fusesource.hawtdispatch._
+import java.net.URI
+import org.apache.activemq.apollo.util.{StateMachine, Log}
+import org.fusesource.hawtbuf.AsciiBuffer
+import org.fusesource.stomp.codec.StompFrame
+import org.apache.activemq.apollo.broker.Broker
+import java.util.Properties
+import org.fusesource.stomp.client.{CallbackConnection, Stomp}
+import java.util.concurrent.TimeUnit
+import collection.mutable.HashMap
+
+
+object StompBridgingStrategy extends Log {
+}
+
+class StompBridgingStrategy(val manager:NetworkManager) extends BridgingStrategy {
+  import StompBridgingStrategy._
+  import org.fusesource.stomp.client.Constants._
+  import org.fusesource.hawtbuf.Buffer._
+  def dispatch_queue = manager.dispatch_queue
+
+  val bridges = HashMap[(String, String), Bridge]()
+
+  def deploy(info:BridgeInfo) = {
+    dispatch_queue.assertExecuting()
+    val bridge = bridges.getOrElseUpdate((info.from, info.to), new Bridge(info.from, info.to))
+    bridge.deploy(info.kind, info.dest)
+  }
+
+
+  def undeploy(info:BridgeInfo) = {
+    dispatch_queue.assertExecuting()
+    for( bridge <- bridges.get((info.from, info.to)) ) {
+      bridge.undeploy(info.kind, info.dest)
+    }
+  }
+
+  class Bridge(from:String, to:String) {
+    val dispatch_queue = createQueue("bridge %s -> %s".format(from, to))
+
+    val from_connection = ConnectionStateMachine(new URI(from))
+    val to_connection = ConnectionStateMachine(new URI(to))
+
+    from_connection.refiller = ^{
+
+    }
+
+    dispatch_queue {
+      from_connection.connect
+      to_connection.connect
+    }
+
+    case class ConnectionStateMachine(uri:URI) extends StateMachine {
+
+      var id_counter = 0L
+      var subscriptions = HashMap[AsciiBuffer, AsciiBuffer]()
+      var pending_sends = HashMap[Long, (StompFrame, ()=>Unit)]()
+
+      var refiller: Runnable = ^{ sys.error("refiller not set") }
+      var receive_handler: (StompFrame)=>Boolean = frame => {
+        info("dropping frame: %s", frame)
+        true
+      }
+
+
+      def next_id = {
+        val rc = id_counter
+        id_counter += 1
+        rc
+      }
+
+      def next_hex_id = {
+        ascii(next_id.toHexString)
+      }
+
+
+      def init() = DisconnectedState()
+
+      case class DisconnectedState() extends State {
+        def connect = react {
+          become(ConnectingState())
+        }
+      }
+
+      case class ConnectingState() extends State {
+        override def init() = {
+          debug("Connecting bridge to %s", uri)
+          val to_stomp = new Stomp()
+          to_stomp.setDispatchQueue(dispatch_queue)
+          to_stomp.setRemoteURI(uri)
+          to_stomp.setLogin("admin")
+          to_stomp.setPasscode("password")
+          to_stomp.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL)
+          val headers = new Properties()
+          headers.put("client-type", "apollo-bridge")
+          to_stomp.setCustomHeaders(headers)
+
+          to_stomp.connectCallback(new org.fusesource.stomp.client.Callback[CallbackConnection] {
+            override def onSuccess(value: CallbackConnection) = react {
+              become(ConnectedState(value))
+            }
+            override def onFailure(value: Throwable) = react {
+              debug("Could not connect bridge to %s due to: ", uri, value)
+              become(ReconnectDelayState(1000))
+            }
+          })
+        }
+      }
+
+      case class ReconnectDelayState(delay:Long) extends State {
+        override def init() = {
+          debug("Will attempt a reconnect to %s in %d ms", uri, delay)
+          dispatch_queue.after(delay, TimeUnit.MILLISECONDS) {
+            react(become(ConnectingState()))
+          }
+        }
+      }
+
+      case class ConnectedState(connection:CallbackConnection) extends State {
+
+        var closed  = false
+
+        override def init() = {
+          debug("Bridge connected to: %s", uri)
+          connection.receive(new org.fusesource.stomp.client.Callback[StompFrame] {
+            override def onSuccess(value: StompFrame) = {
+              if( !receive_handler(value) ) {
+                connection.suspend()
+              }
+            }
+            override def onFailure(value: Throwable) = {
+              failed(value)
+            }
+          })
+          connection.refiller(refiller)
+
+          // Reconnect any subscriptions.
+          subscriptions.keySet.foreach(subscribe(_))
+          // Re-send messages..
+          pending_sends.values.foreach(x => do_send(x._1, x._2))
+
+        }
+
+        def do_send(frame:StompFrame, on_complete: ()=>Unit) = {
+          connection.request(frame, new org.fusesource.stomp.client.Callback[StompFrame] {
+            override def onSuccess(response: StompFrame) = on_complete()
+            override def onFailure(value: Throwable) = failed(value)
+          })
+        }
+
+        def failed(value: Throwable)= {
+          debug("Bridge connection to %s failed due to: ", uri, value)
+          close(ReconnectDelayState(1000))
+        }
+
+        def close(next: State) = {
+          if( closed ) {
+            become(next)
+          } else {
+            closed = true
+            val pause = Pause(next)
+            become(pause)
+            debug("Closing connection to %s", uri)
+            connection.close(^{
+              debug("Closed connection to %s", uri)
+              pause.continue
+            })
+          }
+        }
+      }
+
+      def connect = react[DisconnectedState] { state => state.connect }
+      def for_connection(func: (CallbackConnection)=>Unit) = react[ConnectedState] { state => func(state.connection) }
+      def suspend = for_connection { connection => connection.suspend() }
+      def resume = for_connection { connection => connection.resume() }
+
+      def subscribe(destination:AsciiBuffer) = {
+        val id = subscriptions.getOrElseUpdate(destination, next_hex_id)
+        for_connection{ connection =>
+          val frame = new StompFrame(SUBSCRIBE)
+          frame.addHeader(ID, id)
+          frame.addHeader(DESTINATION, destination)
+          frame.addHeader(ACK_MODE, CLIENT)
+          connection.send(frame, null)
+        }
+      }
+
+      def unsubscribe(destination:AsciiBuffer) = {
+        subscriptions.remove(destination) match {
+          case Some(id) =>
+            for_connection{ connection =>
+              val frame = new StompFrame(UNSUBSCRIBE)
+              frame.addHeader(ID, id)
+              connection.send(frame, null)
+            }
+          case None =>
+        }
+      }
+
+      def send(destination:StompFrame, on_complete: ()=>Unit) = {
+        val id = next_id
+        val cb = ()=>{
+          pending_sends.remove(id)
+          on_complete()
+        }
+        pending_sends.put(id, (destination, cb))
+        react[ConnectedState] { state => state.do_send(destination, cb) }
+      }
+    }
+
+    def deploy(kind:String, destination:String) = dispatch_queue {
+      val destination_uri = ascii("/%s/%s".format(kind, destination))
+      from_connection.subscribe(destination_uri)
+    }
+
+    def undeploy(kind:String, destination:String) = dispatch_queue {
+      val destination_uri = ascii("/%s/%s".format(kind, destination))
+      from_connection.unsubscribe(destination_uri)
+    }
+
+  }
+
+}
+

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java Thu Jun  7 15:04:23 2012
@@ -14,20 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object CollectionsSupport {
+@XmlRootElement(name="bridge")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BridgeDTO {
+
+    @XmlElementRef(name="rule")
+    public ArrayList<RuleDTO> rules = new ArrayList<RuleDTO>();
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
-}
\ No newline at end of file
+}

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java Thu Jun  7 15:04:23 2012
@@ -14,20 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object CollectionsSupport {
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterMemberDTO {
+
+    @XmlAttribute(name="id")
+    public String id;
+
+    @XmlElementRef(name="service")
+    public ArrayList<ClusterServiceDTO> services = new ArrayList<ClusterServiceDTO>();
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
-}
\ No newline at end of file
+}

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java Thu Jun  7 15:04:23 2012
@@ -14,20 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object CollectionsSupport {
+@XmlRootElement(name="cluster_service")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterServiceDTO {
+
+    @XmlAttribute(name="kind")
+    public String kind;
+
+    @XmlAttribute(name="address")
+    public String address;
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
-}
\ No newline at end of file
+}

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala Thu Jun  7 15:04:23 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.network.dto
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -14,20 +16,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+import org.apache.activemq.apollo.util.DtoModule
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object CollectionsSupport {
+class Module extends DtoModule {
+
+  def dto_package = "org.apache.activemq.apollo.broker.network.dto"
+  def extension_classes = Array(classOf[NetworkManagerDTO])
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java Thu Jun  7 15:04:23 2012
@@ -14,20 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.apache.activemq.apollo.dto.CustomServiceDTO;
+import org.apache.activemq.apollo.dto.ServiceDTO;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
 
 /**
- * <p>
- * </p>
- *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object CollectionsSupport {
+@XmlRootElement(name="network_manager")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class NetworkManagerDTO extends CustomServiceDTO {
+
+//    @XmlElementRef(name="bridge")
+//    public ArrayList<BridgeDTO> bridges = new ArrayList<BridgeDTO>();
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
-}
\ No newline at end of file
+    @XmlElement(name="member")
+    public ArrayList<ClusterMemberDTO> members = new ArrayList<ClusterMemberDTO>();
+}

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java Thu Jun  7 15:04:23 2012
@@ -14,20 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
+
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
 
 /**
- * <p>
- * </p>
+ * Allow you to customize the mqtt protocol implementation.
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object CollectionsSupport {
+@XmlRootElement(name="rule")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RuleDTO {
+
+    @XmlAttribute(name="include")
+    public String include;
+
+    @XmlAttribute(name="exclude")
+    public String exclude;
+
+    @XmlAttribute(name="kind")
+    public String kind;
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
-}
\ No newline at end of file
+}

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java Thu Jun  7 15:04:23 2012
@@ -14,20 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.util
 
 /**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * The JAXB POJOs for the
+ * The JAXB POJOs for the
+ * <a href="http://activemq.apache.org/schema/activemq/apollo/xml-configuration.html">XML Configuration</a>
+ * of the ActiveMQ Broker.
  */
-object CollectionsSupport {
+@javax.xml.bind.annotation.XmlSchema(
+        namespace = "http://activemq.apache.org/schema/activemq/apollo",
+        elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED)
+package org.apache.activemq.apollo.broker.network.dto;
 
-  def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
-    val updating = prev.intersect(next)
-    val adding = next -- updating
-    val removing = prev -- next
-    (adding, updating, removing)
-  }
-}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml Thu Jun  7 15:04:23 2012
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+  <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+  <virtual_host id="broker1">
+    <host_name>localhost</host_name>
+  </virtual_host>
+
+  <!-- once we get dynamic discovery working we can avoid using fixed ports -->
+  <web_admin bind="http://0.0.0.0:41000"/>
+  <connector id="tcp" bind="tcp://0.0.0.0:41001"/>
+
+  <network_manager>
+    <member id="broker1">
+      <service kind="web_admin" address="http://0.0.0.0:41000"/>
+      <service kind="stomp" address="tcp://0.0.0.0:41001"/>
+    </member>
+    <member id="broker2">
+      <service kind="web_admin" address="http://0.0.0.0:42000"/>
+      <service kind="stomp" address="tcp://0.0.0.0:42001"/>
+    </member>
+  </network_manager>
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml Thu Jun  7 15:04:23 2012
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+  <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+  <virtual_host id="broker2">
+    <host_name>localhost</host_name>
+  </virtual_host>
+
+  <!-- once we get dynamic discovery working we can avoid using fixed ports -->
+  <web_admin bind="http://0.0.0.0:42000"/>
+  <connector id="tcp" bind="tcp://0.0.0.0:42001"/>
+
+  <network_manager>
+    <member id="broker1">
+      <service kind="web_admin" address="http://0.0.0.0:41000"/>
+      <service kind="stomp" address="tcp://0.0.0.0:41001"/>
+    </member>
+    <member id="broker2">
+      <service kind="web_admin" address="http://0.0.0.0:42000"/>
+      <service kind="stomp" address="tcp://0.0.0.0:42001"/>
+    </member>
+  </network_manager>
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties Thu Jun  7 15:04:23 2012
@@ -0,0 +1,36 @@
+#
+# Copyright (C) 2012 FuseSource Corp. All rights reserved.
+# http://fusesource.com
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.fusesource=INFO
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Propchange: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala Thu Jun  7 15:04:23 2012
@@ -0,0 +1,21 @@
+package org.apache.activemq.apollo.broker.network
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import org.apache.activemq.apollo.broker.MultiBrokerTestSupport
+
+
+class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with BeforeAndAfterEach {
+
+  override def broker_config_uris = Array(
+    "xml:classpath:apollo-network-1.xml",
+    "xml:classpath:apollo-network-2.xml"
+  )
+
+  test("basics") {
+    admins(0).broker should not be(null)
+    val config = admins(0).broker.config
+    admins(1).broker should not be(null)
+  }
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala Thu Jun  7 15:04:23 2012
@@ -23,7 +23,7 @@ abstract class SecurityTest extends Open
 
   override val broker_config_uri: String = "xml:classpath:apollo-openwire-secure.xml"
 
-  override protected def beforeAll = {
+  override def beforeAll = {
     try {
       val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
       System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala Thu Jun  7 15:04:23 2012
@@ -24,7 +24,7 @@ class SslSecurityTest extends OpenwireTe
   override val broker_config_uri: String = "xml:classpath:apollo-openwire-ssl-secure.xml"
   override val transport_scheme = "ssl"
 
-  override protected def beforeAll = {
+  override def beforeAll = {
     // System.setProperty("javax.net.debug", "all")
     try {
       val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Thu Jun  7 15:04:23 2012
@@ -1898,7 +1898,7 @@ class StompSecurityTest extends StompTes
 
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-secure.xml"
 
-  override protected def beforeAll = {
+  override def beforeAll = {
     try {
       val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
       System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
@@ -2102,7 +2102,7 @@ class StompSslSecurityTest extends Stomp
 
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl-secure.xml"
 
-  override protected def beforeAll = {
+  override def beforeAll = {
     // System.setProperty("javax.net.debug", "all")
     try {
       val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala Thu Jun  7 15:04:23 2012
@@ -72,7 +72,7 @@ abstract class StompWebSocketTestSupport
 
   def ws_port: Int = connector_port("ws").get
 
-  override protected def beforeAll() = {
+  override def beforeAll() = {
     try {
       driver = create_web_driver(test_data_dir / "profile")
       super.beforeAll()
@@ -82,7 +82,7 @@ abstract class StompWebSocketTestSupport
     }
   }
 
-  override protected def afterAll() = {
+  override def afterAll() = {
     if (driver != null) {
       driver.quit()
       driver = null