You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/16 20:55:29 UTC
svn commit: r1362190 - in /activemq/activemq-apollo/trunk/apollo-network: ./
src/main/resources/META-INF/services/org.apache.activemq.apollo/
src/main/resources/org/apache/activemq/apollo/broker/network/dto/
src/main/scala/org/apache/activemq/apollo/br...
Author: chirino
Date: Mon Jul 16 18:55:29 2012
New Revision: 1362190
URL: http://svn.apache.org/viewvc?rev=1362190&view=rev
Log:
Added a Zeroconf based cluster membership monitor.
Added:
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java
- copied, changed from r1355565, activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala
Modified:
activemq/activemq-apollo/trunk/apollo-network/pom.xml
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index
activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala
Modified: activemq/activemq-apollo/trunk/apollo-network/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/pom.xml?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-network/pom.xml Mon Jul 16 18:55:29 2012
@@ -58,6 +58,14 @@
<version>1.11</version>
</dependency>
+ <!-- Optional Dependencies -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-jmdns_1.0</artifactId>
+ <version>5.7-SNAPSHOT</version>
+ <optional>true</optional>
+ </dependency>
+
<!-- Testing Dependencies -->
<dependency>
<groupId>org.scalatest</groupId>
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index Mon Jul 16 18:55:29 2012
@@ -14,4 +14,5 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.network.JVMMembershipMonitorFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.network.JVMMembershipMonitorFactory
+org.apache.activemq.apollo.broker.network.ZeroconfMembershipMonitorFactory
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index Mon Jul 16 18:55:29 2012
@@ -17,4 +17,5 @@
NetworkManagerDTO
BridgeDTO
MembershipMonitorDTO
-JVMMembershipMonitorDTO
\ No newline at end of file
+JVMMembershipMonitorDTO
+ZeroconfMembershipMonitorDTO
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala Mon Jul 16 18:55:29 2012
@@ -69,6 +69,7 @@ object NetworkManager extends Log {
connector.socket_address match {
case address:InetSocketAddress =>
rc = rc.replaceAllLiterally("{{connector."+id+".port}}", ""+address.getPort)
+ case _ =>
}
case _ =>
}
@@ -121,6 +122,8 @@ class NetworkManager(broker: Broker) ext
var monitor = MembershipMonitorFactory.create(broker, monitor_dto)
if(monitor!=null) {
monitors ::= monitor
+ } else {
+ warn("Could not create the membership monitor for: "+monitor_dto)
}
}
@@ -146,9 +149,11 @@ class NetworkManager(broker: Broker) ext
def on_membership_change(value: collection.Set[ClusterMemberDTO]) = dispatch_queue {
val (added, _, removed) = diff(members, value)
for( m <- removed ) {
+ info("Broker host left the network: %s", m.id)
load_monitor.remove(m)
}
for( m <- added ) {
+ info("Broker host joined the network: %s", m.id)
load_monitor.add(m)
}
members = value
Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala?rev=1362190&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala Mon Jul 16 18:55:29 2012
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import dto._
+import org.apache.activemq.jmdns._
+import java.io.IOException
+
+import java.net.InetAddress
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.apollo.util.BaseService
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.dto.JsonCodec
+import org.fusesource.hawtbuf.Buffer
+import collection.mutable
+import collection.mutable.ListBuffer
+import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.util.CollectionsSupport._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ZeroconfMembershipMonitorFactory extends MembershipMonitorFactory.SPI {
+ def create(broker: Broker, dto: MembershipMonitorDTO): MembershipMonitor = dto match {
+ case dto:ZeroconfMembershipMonitorDTO=> new ZeroconfMembershipMonitor(broker, dto)
+ case _ => null
+ }
+}
+
+/**
+ * <p>
+ * Uses Zeroconf to discover the cluster members.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ZeroconfMembershipMonitor(val broker: Broker, val dto:ZeroconfMembershipMonitorDTO) extends BaseService with MembershipMonitor {
+
+ def dispatch_queue: DispatchQueue = createQueue()
+ val members = mutable.HashMap[String, ClusterMemberDTO]()
+ var jmdns:JmDNS = _
+
+ var localservices = Set[ServiceInfo]()
+
+ protected def _start(on_completed: Task) = {
+
+ if (dto.group == null) {
+ throw new IOException("You must specify a group to discover")
+ }
+
+ jmdns = new JmDNS(if (dto.address != null) {
+ InetAddress.getByName(dto.address)
+ } else {
+ InetAddress.getLocalHost
+ })
+
+ Broker.BLOCKABLE_THREAD_POOL.future {
+ // Listen for service record events.
+ val kind = "_" + dto.group + ".apollo."
+ jmdns.addListener(new DNSListener {
+ def updateRecord(jmdns: JmDNS, now: Long, record: DNSRecord) {
+ if ( record.getName.endsWith(kind) ) {
+ record match {
+ case record:DNSRecord.Text =>
+ var member = JsonCodec.decode(new Buffer(record.text), classOf[ClusterMemberDTO])
+ if( record.isExpired(Broker.now) ) {
+ members.remove(member.id)
+ } else {
+ members.put(member.id, member)
+ }
+ if ( listener!=null ) {
+ listener.on_membership_change(members.values.toSet)
+ }
+ case _ =>
+ }
+ }
+ }
+ }, null)
+
+ }.onComplete(_ => on_completed.run() )
+
+ schedule_reoccurring(1, TimeUnit.SECONDS) {
+
+ val next = local_services.toSet
+ val (added, _, removed) = diff(localservices, next)
+ localservices = next
+
+ if( !added.isEmpty || !removed.isEmpty) {
+ Broker.BLOCKABLE_THREAD_POOL.future {
+ for( s <- added ) {
+ jmdns.registerService(s)
+ }
+ for( s <- removed ) {
+ jmdns.unregisterService(s)
+ }
+ }
+ }
+ }
+ }
+
+
+ def local_services:Seq[ServiceInfo] = {
+ import collection.JavaConversions._
+ val kind = "_" + dto.group + ".apollo."
+ val rc = ListBuffer[ServiceInfo]()
+ broker.config.services.foreach(_ match {
+ case x:NetworkManagerDTO=>
+ if(x.self!=null ) {
+ // multiple hosts??
+ if(NetworkManager.has_variables(x.self.id)) {
+ for( host <- broker.virtual_hosts.values.toIterable ) {
+ var resolved = NetworkManager.resolve_variables(x.self, broker, host)
+ if( !NetworkManager.has_variables(resolved) ) {
+ rc += new ServiceInfo(kind, host.id, 0, 0, 0, JsonCodec.encode(resolved).toByteArray)
+ }
+ }
+ } else {
+ var resolved = NetworkManager.resolve_variables(x.self, broker, null)
+ if( !NetworkManager.has_variables(resolved) ) {
+ rc += new ServiceInfo(kind, x.self.id, 0, 0, 0, JsonCodec.encode(resolved).toByteArray)
+ }
+ }
+ }
+ case _ =>
+ })
+ rc
+ }
+
+ protected def _stop(on_completed: Task) = {
+ Broker.BLOCKABLE_THREAD_POOL.future {
+ jmdns.close()
+ jmdns = null;
+ }.onComplete(x => on_completed.run())
+ }
+}
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java Mon Jul 16 18:55:29 2012
@@ -31,6 +31,9 @@ public class ClusterMemberDTO {
@XmlAttribute(name="id")
public String id;
+ @XmlAttribute(name="cluster")
+ public String cluster;
+
@XmlElement(name="service")
public ArrayList<ClusterServiceDTO> services = new ArrayList<ClusterServiceDTO>();
@@ -42,6 +45,7 @@ public class ClusterMemberDTO {
ClusterMemberDTO that = (ClusterMemberDTO) o;
if (id != null ? !id.equals(that.id) : that.id != null) return false;
+ if (cluster != null ? !cluster.equals(that.id) : that.cluster != null) return false;
if (services != null ? !services.equals(that.services) : that.services != null)
return false;
@@ -51,6 +55,7 @@ public class ClusterMemberDTO {
@Override
public int hashCode() {
int result = id != null ? id.hashCode() : 0;
+ result = 31 * result + (cluster != null ? cluster.hashCode() : 0);
result = 31 * result + (services != null ? services.hashCode() : 0);
return result;
}
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala Mon Jul 16 18:55:29 2012
@@ -26,7 +26,8 @@ class Module extends DtoModule {
def dto_package = "org.apache.activemq.apollo.broker.network.dto"
def extension_classes = Array(
classOf[NetworkManagerDTO],
- classOf[JVMMembershipMonitorDTO]
+ classOf[JVMMembershipMonitorDTO],
+ classOf[ZeroconfMembershipMonitorDTO]
)
}
\ No newline at end of file
Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java (from r1355565, activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java&p1=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala&r1=1355565&r2=1362190&rev=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java Mon Jul 16 18:55:29 2012
@@ -1,5 +1,3 @@
-package org.apache.activemq.apollo.broker.network.dto
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,17 +14,26 @@ package org.apache.activemq.apollo.broke
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import org.apache.activemq.apollo.util.DtoModule
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Module extends DtoModule {
+@XmlRootElement(name="zeroconf_membership")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ZeroconfMembershipMonitorDTO extends MembershipMonitorDTO {
- def dto_package = "org.apache.activemq.apollo.broker.network.dto"
- def extension_classes = Array(
- classOf[NetworkManagerDTO],
- classOf[JVMMembershipMonitorDTO]
- )
+ @XmlAttribute
+ public String address;
+ @XmlAttribute
+ public String group;
-}
\ No newline at end of file
+}