You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/07 17:04:25 UTC
svn commit: r1347661 [1/2] - in /activemq/activemq-apollo/trunk: ./
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/test/scala/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-dto/src/main/resources/org...
Author: chirino
Date: Thu Jun 7 15:04:23 2012
New Revision: 1347661
URL: http://svn.apache.org/viewvc?rev=1347661&view=rev
Log:
Adding an optional network manager which handles forwarding of messages between brokers to allow the creation of broker networks/federations.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/
activemq/activemq-apollo/trunk/apollo-network/pom.xml
activemq/activemq-apollo/trunk/apollo-network/src/
activemq/activemq-apollo/trunk/apollo-network/src/main/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java
- copied, changed from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-network/src/test/
activemq/activemq-apollo/trunk/apollo-network/src/test/resources/
activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties (with props)
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/
activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/StateMachine.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/BaseService.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Thu Jun 7 15:04:23 2012
@@ -33,7 +33,7 @@ import management.ManagementFactory
import org.apache.activemq.apollo.dto._
import javax.management.ObjectName
import org.fusesource.hawtdispatch.TaskTracker._
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit._
import security.SecuredResource.BrokerKind
import reflect.BeanProperty
import java.net.InetSocketAddress
@@ -157,6 +157,9 @@ object Broker extends Log {
val buffer_pools = new BufferPools
def class_loader:ClassLoader = ClassFinder.class_loader
+
+ @volatile
+ var now = System.currentTimeMillis()
val version = using(getClass().getResourceAsStream("version.txt")) { source=>
read_text(source).trim
@@ -259,7 +262,7 @@ class Broker() extends BaseService with
var web_server:WebServer = _
@volatile
- var now = System.currentTimeMillis()
+ def now = Broker.now
var config_log:Log = Log(new MemoryLogger(Broker.log))
var audit_log:Log = Broker
@@ -297,8 +300,12 @@ class Broker() extends BaseService with
check_file_limit
BrokerRegistry.add(this)
- schedule_now_update
- schedule_virtualhost_maintenance
+ schedule_reoccurring(100, MILLISECONDS) {
+ Broker.now = System.currentTimeMillis
+ }
+ schedule_reoccurring(1, SECONDS) {
+ virtualhost_maintenance
+ }
val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
apply_update(tracker)
@@ -340,28 +347,17 @@ class Broker() extends BaseService with
}
- def schedule_now_update:Unit = dispatch_queue.after(100, TimeUnit.MILLISECONDS) {
- if( service_state.is_starting_or_started ) {
- now = System.currentTimeMillis
- schedule_now_update
- }
- }
-
- def schedule_virtualhost_maintenance:Unit = dispatch_queue.after(1, TimeUnit.SECONDS) {
- if( service_state.is_started ) {
- val active_sessions = connections.values.flatMap(_.session_id).toSet
-
- virtual_hosts.values.foreach { host=>
- host.dispatch_queue {
- if(host.service_state.is_started) {
- host.router.remove_temp_destinations(active_sessions)
- }
+ def virtualhost_maintenance = {
+ val active_sessions = connections.values.flatMap(_.session_id).toSet
+ virtual_hosts.values.foreach { host=>
+ host.dispatch_queue {
+ if(host.service_state.is_started) {
+ host.router.remove_temp_destinations(active_sessions)
}
}
-
- schedule_virtualhost_maintenance
}
}
+
protected def init_logs = {
import OptionSupport._
// Configure the logging categories...
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Jun 7 15:04:23 2012
@@ -308,6 +308,21 @@ class Queue(val router: LocalRouter, val
rc
}
+ def load_status = {
+ val rc = new DestinationLoadDTO
+ rc.id = this.id
+ rc.message_count = queue_size
+ rc.message_size = queue_items
+ rc.message_count_enqueue_counter = enqueue_item_counter
+ rc.message_size_enqueue_counter = enqueue_size_counter
+ rc.message_count_dequeue_counter = dequeue_item_counter
+ rc.message_size_dequeue_counter = dequeue_size_counter
+// TODO: expose selector attribute of consumer.
+// for( consumer <- all_subscriptions.keys ) {
+// rc.consumer_selectors.add(consumer.selector)
+// }
+ rc
+ }
def status(entries:Boolean=false) = {
val rc = new QueueStatusDTO
rc.id = this.id
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Thu Jun 7 15:04:23 2012
@@ -43,7 +43,7 @@ class BrokerFunSuiteSupport extends FunS
broker
}
- override protected def beforeAll() = {
+ override def beforeAll() = {
super.beforeAll()
try {
broker = createBroker
@@ -54,7 +54,7 @@ class BrokerFunSuiteSupport extends FunS
}
}
- override protected def afterAll() = {
+ override def afterAll() = {
ServiceControl.stop(broker)
super.afterAll()
}
@@ -142,3 +142,34 @@ class BrokerFunSuiteSupport extends FunS
}
}
+
+class MultiBrokerTestSupport extends FunSuiteSupport {
+
+ case class BrokerAdmin(override val broker_config_uri:String) extends BrokerFunSuiteSupport
+
+ def broker_config_uris = Array("xml:classpath:apollo.xml")
+ var admins = Array[BrokerAdmin]()
+
+ override protected def beforeAll() = {
+ super.beforeAll()
+ try {
+ admins = broker_config_uris.map(BrokerAdmin(_))
+ admins.foreach(_.beforeAll)
+ } catch {
+ case e: Throwable => e.printStackTrace
+ }
+ }
+
+ override protected def afterAll() = {
+ for( admin <- admins ) {
+ try {
+ admin.afterAll
+ } catch {
+ case e => debug(e)
+ }
+ }
+ admins = Array()
+ super.afterAll()
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationLoadDTO.java Thu Jun 7 15:04:23 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * The
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="destination_load")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DestinationLoadDTO {
+
+ @XmlAttribute(name="id")
+ public String id;
+
+ @XmlAttribute(name="message_count")
+ public Long message_count;
+
+ @XmlAttribute(name="message_size")
+ public Long message_size;
+
+ @XmlAttribute(name="message_count_enqueue_counter")
+ public Long message_count_enqueue_counter;
+
+ @XmlAttribute(name="message_size_enqueue_counter")
+ public Long message_size_enqueue_counter;
+
+ @XmlAttribute(name="message_count_dequeue_counter")
+ public Long message_count_dequeue_counter;
+
+ @XmlAttribute(name="message_size_dequeue_counter")
+ public Long message_size_dequeue_counter;
+
+ public HashSet<String> consumer_selectors = new HashSet<String>();
+
+}
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/LoadStatusDTO.java Thu Jun 7 15:04:23 2012
@@ -14,20 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object CollectionsSupport {
+@XmlRootElement(name="flow_report")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LoadStatusDTO {
+
+ @XmlAttribute(name="id")
+ public String id;
+
+ @XmlAttribute(name="timestamp")
+ public long timestamp;
+
+ @XmlElementRef(name="queue")
+ public ArrayList<DestinationLoadDTO> queues = new ArrayList<DestinationLoadDTO>();
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
-}
\ No newline at end of file
+ @XmlElementRef(name="topic")
+ public ArrayList<DestinationLoadDTO> topics = new ArrayList<DestinationLoadDTO>();
+}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Thu Jun 7 15:04:23 2012
@@ -22,6 +22,7 @@ AggregateDestMetricsDTO
AuthenticationDTO
AutoGCServiceDTO
BrokerDTO
+LoadStatusDTO
BrokerStatusDTO
ConnectionStatusDTO
ConnectorStatusDTO
@@ -30,6 +31,7 @@ CustomServiceDTO
DataPageDTO
DestMetricsDTO
DestinationDTO
+DestinationLoadDTO
DetectDTO
DurableSubscriptionDTO
DurableSubscriptionDestinationDTO
Added: activemq/activemq-apollo/trunk/apollo-network/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/pom.xml?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/pom.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-network/pom.xml Thu Jun 7 15:04:23 2012
@@ -0,0 +1,170 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Copyright (C) 2012 FuseSource Corp. All rights reserved.
+ http://fusesource.com
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-scala</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <relativePath>../apollo-scala</relativePath>
+ </parent>
+
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-network</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>${project.artifactId}</name>
+ <description>Used to create a federated network of Apollo Brokers</description>
+
+ <properties>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-broker</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math</artifactId>
+ <version>2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.stompjms</groupId>
+ <artifactId>stompjms-client</artifactId>
+ <version>1.11</version>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.9.1</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>${junit-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-broker</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-util</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-stomp</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-openwire</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-leveldb</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-bdb</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-web</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all-server</artifactId>
+ <version>${jetty-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${slf4j-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ <version>2.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>always</forkMode>
+ <excludes>
+ <exclude>**/EnqueueRateScenariosTest.*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/custom-service-factory.index Thu Jun 7 15:04:23 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.network.NetworkManagerFactory
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index Thu Jun 7 15:04:23 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.broker.network.dto.Module
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index Thu Jun 7 15:04:23 2012
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+NetworkManagerDTO
+BridgeDTO
\ No newline at end of file
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BridgingStrategy.scala Thu Jun 7 15:04:23 2012
@@ -14,20 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object CollectionsSupport {
+case class BridgeInfo(from:String, to:String, kind:String, dest:String)
+
+trait BridgingStrategy {
+
+ def deploy(info:BridgeInfo)
+ def undeploy(info:BridgeInfo)
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala Thu Jun 7 15:04:23 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import org.apache.activemq.apollo.dto.{LoadStatusDTO, JsonCodec}
+import dto.ClusterMemberDTO
+import org.fusesource.hawtdispatch._
+import collection.mutable.HashMap
+import java.net.URL
+import java.util.concurrent.TimeUnit._
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.apollo.util.FileSupport._
+import org.apache.activemq.apollo.util.{Log, StateMachine, BaseService, Service}
+
+/**
+ *
+ */
+trait BrokerLoadMonitor extends Service {
+ var listener:BrokerLoadListener = _
+ def add(member:ClusterMemberDTO)
+ def remove(member:ClusterMemberDTO)
+}
+
+trait BrokerLoadListener {
+ def on_load_change(broker_load:LoadStatusDTO)
+}
+object RestLoadMonitor extends Log
+class RestLoadMonitor extends BaseService with BrokerLoadMonitor {
+ import collection.JavaConversions._
+ import RestLoadMonitor._
+
+ val dispatch_queue = createQueue("rest load monitor")
+ val members = HashMap[String, LoadMonitor]()
+ var poll_interval = 5*1000;
+
+ protected def _start(on_completed: Task) = {
+ on_completed.run()
+ schedule_reoccurring(1, SECONDS) {
+ for(monitor <- members.values) {
+ monitor.poll
+ }
+ }
+ }
+
+ protected def _stop(on_completed: Task) = {
+ on_completed.run()
+ }
+
+ case class LoadMonitor(id:String, url:URL) extends StateMachine {
+ var next_poll = Broker.now
+
+ protected def init() = IdleState()
+
+ case class IdleState() extends State {
+ def poll = react {
+ become(PollingState())
+ }
+ }
+
+ case class PollingState() extends State {
+ override def init() = {
+ val started = Broker.now
+
+ // Switch to the blockable thread poll since accessing the data
+ // is a blocking operation.
+ Broker.BLOCKABLE_THREAD_POOL {
+ try {
+ val dto = using(url.openStream()) {
+ is =>
+ JsonCodec.mapper.readValue(is, classOf[LoadStatusDTO])
+ }
+ dto.id = id
+ dto.timestamp = Broker.now
+ listener.on_load_change(dto)
+ } catch {
+ case e => debug(e, "Failed poll broker load of %s", id)
+ }
+ dispatch_queue {
+ become(IdleState())
+ }
+ }
+ }
+ }
+
+ def poll = {
+ if ( Broker.now >= next_poll ) {
+ state match {
+ case state:IdleState => state.poll
+ case _ =>
+ }
+ }
+ }
+ }
+
+ def add(member: ClusterMemberDTO) = dispatch_queue {
+ for(service <- member.services) {
+ if( service.kind == "webadmin" ) {
+ members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+ }
+ }
+ }
+
+ def remove(member: ClusterMemberDTO) = dispatch_queue {
+ members.remove(member.id)
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala Thu Jun 7 15:04:23 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import org.apache.activemq.apollo.dto.LoadStatusDTO
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
+import collection.mutable.HashMap
+
+class CounterDrivenRate {
+ var last_value = Long.MaxValue
+ val stats = new DescriptiveStatistics()
+ stats.setWindowSize(10)
+ var mean = 0.0d
+
+ def update(value:Long, duration:Double) = {
+ // is it a counter reset??
+ if( value < last_value ) {
+ last_value = value
+ } else {
+ val increment = value - last_value
+ stats.addValue(increment / duration)
+ mean = stats.getMean
+ }
+ }
+}
+
+class DestinationMetrics {
+ var consumer_count = 0L
+ var message_size = 0L
+ val enqueue_size_rate = new CounterDrivenRate()
+ val dequeue_size_rate = new CounterDrivenRate()
+}
+
+class BrokerMetrics() {
+
+ import collection.JavaConversions._
+ var queue_load = HashMap[String, DestinationMetrics]()
+// var topic_load = HashMap[String, DestinationMetrics]()
+ var timestamp = System.currentTimeMillis()
+
+ def update(current:LoadStatusDTO) = {
+ val now = System.currentTimeMillis()
+ val duration = (now - timestamp)/1000.0d
+ timestamp = now
+
+ var next_queue_load = HashMap[String, DestinationMetrics]()
+ for( dest <- current.queues ) {
+ val dest_load = queue_load.get(dest.id).getOrElse(new DestinationMetrics())
+ dest_load.message_size = dest.message_size
+ dest_load.enqueue_size_rate.update(dest.message_size_enqueue_counter, duration)
+ dest_load.dequeue_size_rate.update(dest.message_size_dequeue_counter, duration)
+ next_queue_load += dest.id -> dest_load
+ }
+ queue_load = next_queue_load
+
+// var next_topic_load = HashMap[String, DestinationMetrics]()
+// for( dest <- current.topics ) {
+// val dest_load = topic_load.get(dest.id).getOrElse(new DestinationMetrics())
+// dest_load.message_size = dest.message_size
+// dest_load.enqueue_size_rate.update(dest.message_size_enqueue_counter, duration)
+// dest_load.dequeue_size_rate.update(dest.message_size_dequeue_counter, duration)
+// next_topic_load += dest.id -> dest_load
+// }
+// topic_load = next_topic_load
+ }
+}
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ClusterMembershipMonitor.scala Thu Jun 7 15:04:23 2012
@@ -14,20 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network
-/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object CollectionsSupport {
+import dto.ClusterMemberDTO
+import org.apache.activemq.apollo.util.{BaseService, Service}
+import org.fusesource.hawtdispatch._
+
+trait ClusterMembershipMonitor extends Service {
+ var listener:ClusterMembershipListener = _
+}
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
+trait ClusterMembershipListener {
+ def on_cluster_change(members:Set[ClusterMemberDTO])
+}
+
+case class StaticClusterMembershipMonitor(members:Set[ClusterMemberDTO]) extends BaseService with ClusterMembershipMonitor {
+ val dispatch_queue = createQueue("bridge manager")
+ protected def _start(on_completed: Task) = {
+ dispatch_queue {
+ listener.on_cluster_change(members)
+ }
+ on_completed.run()
+ }
+ protected def _stop(on_completed: Task) = {
+ on_completed.run()
}
-}
\ No newline at end of file
+}
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala Thu Jun 7 15:04:23 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import org.apache.activemq.apollo.util._
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.broker.network.dto._
+import CollectionsSupport._
+import java.util.concurrent.TimeUnit._
+import collection.mutable.{LinkedHashMap, HashSet, ListBuffer, HashMap}
+import org.apache.activemq.apollo.broker.{Broker, CustomServiceFactory}
+import org.apache.activemq.apollo.dto.{LoadStatusDTO, CustomServiceDTO}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object NetworkManagerFactory extends CustomServiceFactory with Log {
+ def create(broker: Broker, dto: CustomServiceDTO): Service = dto match {
+ case dto:NetworkManagerDTO =>
+ val rc = new NetworkManager(broker)
+ rc.config = dto
+ rc
+ case _ => null
+ }
+}
+
+object NetworkManager extends Log
+
+class NetworkManager(broker: Broker) extends BaseService with ClusterMembershipListener with BrokerLoadListener {
+ import NetworkManager._
+
+ val dispatch_queue = createQueue("bridge manager")
+
+ var config = new NetworkManagerDTO
+ var membership_monitor:ClusterMembershipMonitor = _
+ var members = Set[ClusterMemberDTO]()
+ var members_by_id = HashMap[String, ClusterMemberDTO]()
+ var load_monitor: BrokerLoadMonitor = _
+ var metrics_map = HashMap[String, BrokerMetrics]()
+ val bridges = HashMap[BridgeInfo, BridgeDeployer]()
+
+ protected def _start(on_completed: Task) = {
+ import collection.JavaConversions._
+
+ // TODO: also support dynamic membership discovery..
+ membership_monitor = StaticClusterMembershipMonitor(config.members.toSet)
+
+ membership_monitor.listener = this
+ membership_monitor.start(NOOP)
+
+ load_monitor = new RestLoadMonitor
+ load_monitor.listener = this
+ load_monitor.start(NOOP)
+
+ schedule_reoccurring(1, SECONDS) {
+ load_analysis
+ }
+ on_completed.run()
+ }
+
+ protected def _stop(on_completed: Task) = {
+ membership_monitor.stop(NOOP)
+ on_completed.run()
+ }
+
+ def on_cluster_change(value: Set[ClusterMemberDTO]) = dispatch_queue {
+ val (added, _, removed) = diff(members, value)
+ for( m <- removed ) {
+ load_monitor.remove(m)
+ }
+ for( m <- added ) {
+ load_monitor.add(m)
+ }
+ members = value
+ members_by_id = HashMap(members.toSeq.map(x=> (x.id->x)) : _*)
+ }
+
+ def on_load_change(dto: LoadStatusDTO) = dispatch_queue {
+ metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto)
+ }
+
+ def load_analysis = {
+ dispatch_queue.assertExecuting()
+
+ // Lets remove load entries for members that were removed from the cluster.
+ val keys = metrics_map.keySet
+ val current = members.map(_.id).toSet
+ metrics_map = metrics_map -- (keys -- current)
+
+
+ class DemandStatus {
+ val needs_consumers = ListBuffer[(String,DestinationMetrics)]()
+ val has_consumers = ListBuffer[(String,DestinationMetrics)]()
+ }
+
+ val queue_demand_map = HashMap[String, DemandStatus]()
+
+ for( (broker, broker_load) <- metrics_map) {
+ for( (id, dest) <- broker_load.queue_load ) {
+ val status = queue_demand_map.getOrElseUpdate(id, new DemandStatus)
+ if( can_bridge_from(broker) && needs_more_consumers(dest) ) {
+ // The broker needs more consumers to drain the queue..
+ status.needs_consumers += (id->dest)
+ } else {
+ // The broker can drain the queue of other brokers..
+ if( can_bridge_to(broker) && dest.consumer_count > 0 ) {
+ status.has_consumers += (id->dest)
+ }
+ }
+ }
+ }
+
+ val desired_bridges = HashSet[BridgeInfo]()
+ for( (id, demand) <- queue_demand_map ) {
+ for( (to, to_metrics)<- demand.needs_consumers; (from, from_metrics) <-demand.has_consumers ) {
+ // we could get fancy and compare the to_metrics and from_metrics to avoid
+ // setting up bridges that won't make a big difference..
+ desired_bridges += BridgeInfo(from, to, "queue", id)
+ }
+ }
+
+ val (bridges_added, _, bridges_removed) = diff(bridges.keySet, desired_bridges)
+
+ // Stop and remove the bridges that are no longer needed..
+ for( info <- bridges_removed ) {
+ bridges.remove(info).get.undeploy
+ }
+
+ // Create and start the new bridges..
+ for( info <- bridges_added ) {
+ val controller = BridgeDeployer(info)
+ bridges.put(info, controller)
+ controller.deploy
+ }
+
+ }
+
+ var local_broker_id = ""
+ var enable_duplex = false
+
+ def can_bridge_from(broker:String):Boolean = broker==local_broker_id
+ def can_bridge_to(broker:String):Boolean = {
+ if ( broker == local_broker_id) {
+ enable_duplex
+ } else {
+ true
+ }
+ }
+
+ def needs_more_consumers(dest:DestinationMetrics):Boolean = {
+
+ // nothing to drain.. so no need for consumers.
+ if( dest.message_size == 0 && dest.enqueue_size_rate.mean == 0) {
+ return false
+ }
+
+ val drain_rate = dest.dequeue_size_rate.mean - dest.enqueue_size_rate.mean
+ if( drain_rate < 0 ) {
+ // Not draining...
+ return true
+ }
+
+ // Might need a consumer due to being drained too slowly..
+ val drain_eta_in_seconds = dest.message_size / drain_rate
+ return drain_eta_in_seconds > 60
+ }
+
+
+ val bridging_strategies = LinkedHashMap[String, BridgingStrategy]()
+ bridging_strategies.put("stomp", new StompBridgingStrategy(this))
+
+ case class BridgeDeployer(info:BridgeInfo) {
+
+ def to = members_by_id.get(info.to)
+ def from = members_by_id.get(info.from)
+
+ var bridging_strategy:BridgingStrategy = _
+ var bridging_strategy_info : BridgeInfo = _
+
+ def deploy:Unit = {
+ // Lets find a service kind that we can use to bridge...
+ import collection.JavaConversions._
+ for( to <- to ; from <-from ) {
+
+ // bridging_strategies are kept in preferred order
+ for( (service_kind, strategy) <- bridging_strategies ) {
+
+ // Lets look to see if we can use the strategy with services exposed by the broker..
+ for( to_service <- to.services; from_service <- from.services ) {
+ if( to_service.kind==service_kind && to_service.kind==from_service.kind ) {
+ bridging_strategy = strategy
+ bridging_strategy_info = BridgeInfo(from_service.address, to_service.address, info.kind, info.dest)
+ bridging_strategy.deploy( bridging_strategy_info )
+ }
+ }
+ }
+ }
+ }
+
+ def undeploy = {
+ if( bridging_strategy!=null ) {
+ bridging_strategy.undeploy(bridging_strategy_info)
+ bridging_strategy = null
+ bridging_strategy_info = null
+ }
+ }
+ }
+
+}
+
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala Thu Jun 7 15:04:23 2012
@@ -0,0 +1,226 @@
+package org.apache.activemq.apollo.broker.network
+
+import org.fusesource.hawtdispatch._
+import java.net.URI
+import org.apache.activemq.apollo.util.{StateMachine, Log}
+import org.fusesource.hawtbuf.AsciiBuffer
+import org.fusesource.stomp.codec.StompFrame
+import org.apache.activemq.apollo.broker.Broker
+import java.util.Properties
+import org.fusesource.stomp.client.{CallbackConnection, Stomp}
+import java.util.concurrent.TimeUnit
+import collection.mutable.HashMap
+
+
+object StompBridgingStrategy extends Log {
+}
+
+class StompBridgingStrategy(val manager:NetworkManager) extends BridgingStrategy {
+ import StompBridgingStrategy._
+ import org.fusesource.stomp.client.Constants._
+ import org.fusesource.hawtbuf.Buffer._
+ def dispatch_queue = manager.dispatch_queue
+
+ val bridges = HashMap[(String, String), Bridge]()
+
+ def deploy(info:BridgeInfo) = {
+ dispatch_queue.assertExecuting()
+ val bridge = bridges.getOrElseUpdate((info.from, info.to), new Bridge(info.from, info.to))
+ bridge.deploy(info.kind, info.dest)
+ }
+
+
+ def undeploy(info:BridgeInfo) = {
+ dispatch_queue.assertExecuting()
+ for( bridge <- bridges.get((info.from, info.to)) ) {
+ bridge.undeploy(info.kind, info.dest)
+ }
+ }
+
+ class Bridge(from:String, to:String) {
+ val dispatch_queue = createQueue("bridge %s -> %s".format(from, to))
+
+ val from_connection = ConnectionStateMachine(new URI(from))
+ val to_connection = ConnectionStateMachine(new URI(to))
+
+ from_connection.refiller = ^{
+
+ }
+
+ dispatch_queue {
+ from_connection.connect
+ to_connection.connect
+ }
+
+ case class ConnectionStateMachine(uri:URI) extends StateMachine {
+
+ var id_counter = 0L
+ var subscriptions = HashMap[AsciiBuffer, AsciiBuffer]()
+ var pending_sends = HashMap[Long, (StompFrame, ()=>Unit)]()
+
+ var refiller: Runnable = ^{ sys.error("refiller not set") }
+ var receive_handler: (StompFrame)=>Boolean = frame => {
+ info("dropping frame: %s", frame)
+ true
+ }
+
+
+ def next_id = {
+ val rc = id_counter
+ id_counter += 1
+ rc
+ }
+
+ def next_hex_id = {
+ ascii(next_id.toHexString)
+ }
+
+
+ def init() = DisconnectedState()
+
+ case class DisconnectedState() extends State {
+ def connect = react {
+ become(ConnectingState())
+ }
+ }
+
+ case class ConnectingState() extends State {
+ override def init() = {
+ debug("Connecting bridge to %s", uri)
+ val to_stomp = new Stomp()
+ to_stomp.setDispatchQueue(dispatch_queue)
+ to_stomp.setRemoteURI(uri)
+ to_stomp.setLogin("admin")
+ to_stomp.setPasscode("password")
+ to_stomp.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL)
+ val headers = new Properties()
+ headers.put("client-type", "apollo-bridge")
+ to_stomp.setCustomHeaders(headers)
+
+ to_stomp.connectCallback(new org.fusesource.stomp.client.Callback[CallbackConnection] {
+ override def onSuccess(value: CallbackConnection) = react {
+ become(ConnectedState(value))
+ }
+ override def onFailure(value: Throwable) = react {
+ debug("Could not connect bridge to %s due to: ", uri, value)
+ become(ReconnectDelayState(1000))
+ }
+ })
+ }
+ }
+
+ case class ReconnectDelayState(delay:Long) extends State {
+ override def init() = {
+ debug("Will attempt a reconnect to %s in %d ms", uri, delay)
+ dispatch_queue.after(delay, TimeUnit.MILLISECONDS) {
+ react(become(ConnectingState()))
+ }
+ }
+ }
+
+ case class ConnectedState(connection:CallbackConnection) extends State {
+
+ var closed = false
+
+ override def init() = {
+ debug("Bridge connected to: %s", uri)
+ connection.receive(new org.fusesource.stomp.client.Callback[StompFrame] {
+ override def onSuccess(value: StompFrame) = {
+ if( !receive_handler(value) ) {
+ connection.suspend()
+ }
+ }
+ override def onFailure(value: Throwable) = {
+ failed(value)
+ }
+ })
+ connection.refiller(refiller)
+
+ // Reconnect any subscriptions.
+ subscriptions.keySet.foreach(subscribe(_))
+ // Re-send messages..
+ pending_sends.values.foreach(x => do_send(x._1, x._2))
+
+ }
+
+ def do_send(frame:StompFrame, on_complete: ()=>Unit) = {
+ connection.request(frame, new org.fusesource.stomp.client.Callback[StompFrame] {
+ override def onSuccess(response: StompFrame) = on_complete()
+ override def onFailure(value: Throwable) = failed(value)
+ })
+ }
+
+ def failed(value: Throwable)= {
+ debug("Bridge connection to %s failed due to: ", uri, value)
+ close(ReconnectDelayState(1000))
+ }
+
+ def close(next: State) = {
+ if( closed ) {
+ become(next)
+ } else {
+ closed = true
+ val pause = Pause(next)
+ become(pause)
+ debug("Closing connection to %s", uri)
+ connection.close(^{
+ debug("Closed connection to %s", uri)
+ pause.continue
+ })
+ }
+ }
+ }
+
+ def connect = react[DisconnectedState] { state => state.connect }
+ def for_connection(func: (CallbackConnection)=>Unit) = react[ConnectedState] { state => func(state.connection) }
+ def suspend = for_connection { connection => connection.suspend() }
+ def resume = for_connection { connection => connection.resume() }
+
+ def subscribe(destination:AsciiBuffer) = {
+ val id = subscriptions.getOrElseUpdate(destination, next_hex_id)
+ for_connection{ connection =>
+ val frame = new StompFrame(SUBSCRIBE)
+ frame.addHeader(ID, id)
+ frame.addHeader(DESTINATION, destination)
+ frame.addHeader(ACK_MODE, CLIENT)
+ connection.send(frame, null)
+ }
+ }
+
+ def unsubscribe(destination:AsciiBuffer) = {
+ subscriptions.remove(destination) match {
+ case Some(id) =>
+ for_connection{ connection =>
+ val frame = new StompFrame(UNSUBSCRIBE)
+ frame.addHeader(ID, id)
+ connection.send(frame, null)
+ }
+ case None =>
+ }
+ }
+
+ def send(destination:StompFrame, on_complete: ()=>Unit) = {
+ val id = next_id
+ val cb = ()=>{
+ pending_sends.remove(id)
+ on_complete()
+ }
+ pending_sends.put(id, (destination, cb))
+ react[ConnectedState] { state => state.do_send(destination, cb) }
+ }
+ }
+
+ def deploy(kind:String, destination:String) = dispatch_queue {
+ val destination_uri = ascii("/%s/%s".format(kind, destination))
+ from_connection.subscribe(destination_uri)
+ }
+
+ def undeploy(kind:String, destination:String) = dispatch_queue {
+ val destination_uri = ascii("/%s/%s".format(kind, destination))
+ from_connection.unsubscribe(destination_uri)
+ }
+
+ }
+
+}
+
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/BridgeDTO.java Thu Jun 7 15:04:23 2012
@@ -14,20 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object CollectionsSupport {
+@XmlRootElement(name="bridge")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BridgeDTO {
+
+ @XmlElementRef(name="rule")
+ public ArrayList<RuleDTO> rules = new ArrayList<RuleDTO>();
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
-}
\ No newline at end of file
+}
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java Thu Jun 7 15:04:23 2012
@@ -14,20 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object CollectionsSupport {
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterMemberDTO {
+
+ @XmlAttribute(name="id")
+ public String id;
+
+ @XmlElementRef(name="service")
+ public ArrayList<ClusterServiceDTO> services = new ArrayList<ClusterServiceDTO>();
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
-}
\ No newline at end of file
+}
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterServiceDTO.java Thu Jun 7 15:04:23 2012
@@ -14,20 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object CollectionsSupport {
+@XmlRootElement(name="cluster_service")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterServiceDTO {
+
+ @XmlAttribute(name="kind")
+ public String kind;
+
+ @XmlAttribute(name="address")
+ public String address;
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
-}
\ No newline at end of file
+}
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala Thu Jun 7 15:04:23 2012
@@ -1,3 +1,5 @@
+package org.apache.activemq.apollo.broker.network.dto
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -14,20 +16,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+import org.apache.activemq.apollo.util.DtoModule
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object CollectionsSupport {
+class Module extends DtoModule {
+
+ def dto_package = "org.apache.activemq.apollo.broker.network.dto"
+ def extension_classes = Array(classOf[NetworkManagerDTO])
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
}
\ No newline at end of file
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java Thu Jun 7 15:04:23 2012
@@ -14,20 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.apache.activemq.apollo.dto.CustomServiceDTO;
+import org.apache.activemq.apollo.dto.ServiceDTO;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
/**
- * <p>
- * </p>
- *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object CollectionsSupport {
+@XmlRootElement(name="network_manager")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class NetworkManagerDTO extends CustomServiceDTO {
+
+// @XmlElementRef(name="bridge")
+// public ArrayList<BridgeDTO> bridges = new ArrayList<BridgeDTO>();
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
-}
\ No newline at end of file
+ @XmlElement(name="member")
+ public ArrayList<ClusterMemberDTO> members = new ArrayList<ClusterMemberDTO>();
+}
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/RuleDTO.java Thu Jun 7 15:04:23 2012
@@ -14,20 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
+
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
/**
- * <p>
- * </p>
+ * Allow you to customize the mqtt protocol implementation.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object CollectionsSupport {
+@XmlRootElement(name="rule")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RuleDTO {
+
+ @XmlAttribute(name="include")
+ public String include;
+
+ @XmlAttribute(name="exclude")
+ public String exclude;
+
+ @XmlAttribute(name="kind")
+ public String kind;
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
-}
\ No newline at end of file
+}
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java (from r1341950, activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java&p1=activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala&r1=1341950&r2=1347661&rev=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/CollectionsSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/package-info.java Thu Jun 7 15:04:23 2012
@@ -14,20 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.util
/**
- * <p>
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * The JAXB POJOs for the
+ * The JAXB POJOs for the
+ * <a href="http://activemq.apache.org/schema/activemq/apollo/xml-configuration.html">XML Configuration</a>
+ * of the ActiveMQ Broker.
*/
-object CollectionsSupport {
+@javax.xml.bind.annotation.XmlSchema(
+ namespace = "http://activemq.apache.org/schema/activemq/apollo",
+ elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED)
+package org.apache.activemq.apollo.broker.network.dto;
- def diff[T](prev:Set[T], next:Set[T]):(Set[T],Set[T],Set[T]) = {
- val updating = prev.intersect(next)
- val adding = next -- updating
- val removing = prev -- next
- (adding, updating, removing)
- }
-}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-1.xml Thu Jun 7 15:04:23 2012
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+ <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+ <virtual_host id="broker1">
+ <host_name>localhost</host_name>
+ </virtual_host>
+
+ <!-- once we get dynamic discovery working we can avoid using fixed ports -->
+ <web_admin bind="http://0.0.0.0:41000"/>
+ <connector id="tcp" bind="tcp://0.0.0.0:41001"/>
+
+ <network_manager>
+ <member id="broker1">
+ <service kind="web_admin" address="http://0.0.0.0:41000"/>
+ <service kind="stomp" address="tcp://0.0.0.0:41001"/>
+ </member>
+ <member id="broker2">
+ <service kind="web_admin" address="http://0.0.0.0:42000"/>
+ <service kind="stomp" address="tcp://0.0.0.0:42001"/>
+ </member>
+ </network_manager>
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/apollo-network-2.xml Thu Jun 7 15:04:23 2012
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+ <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+ <virtual_host id="broker2">
+ <host_name>localhost</host_name>
+ </virtual_host>
+
+ <!-- once we get dynamic discovery working we can avoid using fixed ports -->
+ <web_admin bind="http://0.0.0.0:42000"/>
+ <connector id="tcp" bind="tcp://0.0.0.0:42001"/>
+
+ <network_manager>
+ <member id="broker1">
+ <service kind="web_admin" address="http://0.0.0.0:41000"/>
+ <service kind="stomp" address="tcp://0.0.0.0:41001"/>
+ </member>
+ <member id="broker2">
+ <service kind="web_admin" address="http://0.0.0.0:42000"/>
+ <service kind="stomp" address="tcp://0.0.0.0:42001"/>
+ </member>
+ </network_manager>
+
+</broker>
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties Thu Jun 7 15:04:23 2012
@@ -0,0 +1,36 @@
+#
+# Copyright (C) 2012 FuseSource Corp. All rights reserved.
+# http://fusesource.com
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.fusesource=INFO
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true
Propchange: activemq/activemq-apollo/trunk/apollo-network/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala?rev=1347661&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala Thu Jun 7 15:04:23 2012
@@ -0,0 +1,21 @@
+package org.apache.activemq.apollo.broker.network
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import org.apache.activemq.apollo.broker.MultiBrokerTestSupport
+
+
+class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with BeforeAndAfterEach {
+
+ override def broker_config_uris = Array(
+ "xml:classpath:apollo-network-1.xml",
+ "xml:classpath:apollo-network-2.xml"
+ )
+
+ test("basics") {
+ admins(0).broker should not be(null)
+ val config = admins(0).broker.config
+ admins(1).broker should not be(null)
+ }
+
+}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SecurityTest.scala Thu Jun 7 15:04:23 2012
@@ -23,7 +23,7 @@ abstract class SecurityTest extends Open
override val broker_config_uri: String = "xml:classpath:apollo-openwire-secure.xml"
- override protected def beforeAll = {
+ override def beforeAll = {
try {
val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/SslSecurityTest.scala Thu Jun 7 15:04:23 2012
@@ -24,7 +24,7 @@ class SslSecurityTest extends OpenwireTe
override val broker_config_uri: String = "xml:classpath:apollo-openwire-ssl-secure.xml"
override val transport_scheme = "ssl"
- override protected def beforeAll = {
+ override def beforeAll = {
// System.setProperty("javax.net.debug", "all")
try {
val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Thu Jun 7 15:04:23 2012
@@ -1898,7 +1898,7 @@ class StompSecurityTest extends StompTes
override def broker_config_uri: String = "xml:classpath:apollo-stomp-secure.xml"
- override protected def beforeAll = {
+ override def beforeAll = {
try {
val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
@@ -2102,7 +2102,7 @@ class StompSslSecurityTest extends Stomp
override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl-secure.xml"
- override protected def beforeAll = {
+ override def beforeAll = {
// System.setProperty("javax.net.debug", "all")
try {
val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala?rev=1347661&r1=1347660&r2=1347661&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala Thu Jun 7 15:04:23 2012
@@ -72,7 +72,7 @@ abstract class StompWebSocketTestSupport
def ws_port: Int = connector_port("ws").get
- override protected def beforeAll() = {
+ override def beforeAll() = {
try {
driver = create_web_driver(test_data_dir / "profile")
super.beforeAll()
@@ -82,7 +82,7 @@ abstract class StompWebSocketTestSupport
}
}
- override protected def afterAll() = {
+ override def afterAll() = {
if (driver != null) {
driver.quit()
driver = null