You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/16 20:55:29 UTC

svn commit: r1362190 - in /activemq/activemq-apollo/trunk/apollo-network: ./ src/main/resources/META-INF/services/org.apache.activemq.apollo/ src/main/resources/org/apache/activemq/apollo/broker/network/dto/ src/main/scala/org/apache/activemq/apollo/br...

Author: chirino
Date: Mon Jul 16 18:55:29 2012
New Revision: 1362190

URL: http://svn.apache.org/viewvc?rev=1362190&view=rev
Log:
Added a Zeroconf based cluster membership monitor.

Added:
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java
      - copied, changed from r1355565, activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-network/pom.xml
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index
    activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala

Modified: activemq/activemq-apollo/trunk/apollo-network/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/pom.xml?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-network/pom.xml Mon Jul 16 18:55:29 2012
@@ -58,6 +58,14 @@
       <version>1.11</version>
     </dependency>
 
+    <!-- Optional Dependencies -->
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-jmdns_1.0</artifactId>
+      <version>5.7-SNAPSHOT</version>
+      <optional>true</optional>
+    </dependency>
+
     <!-- Testing Dependencies -->
     <dependency>
       <groupId>org.scalatest</groupId>

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/META-INF/services/org.apache.activemq.apollo/cluster-membership-factory.index Mon Jul 16 18:55:29 2012
@@ -14,4 +14,5 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.broker.network.JVMMembershipMonitorFactory
\ No newline at end of file
+org.apache.activemq.apollo.broker.network.JVMMembershipMonitorFactory
+org.apache.activemq.apollo.broker.network.ZeroconfMembershipMonitorFactory
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/resources/org/apache/activemq/apollo/broker/network/dto/jaxb.index Mon Jul 16 18:55:29 2012
@@ -17,4 +17,5 @@
 NetworkManagerDTO
 BridgeDTO
 MembershipMonitorDTO
-JVMMembershipMonitorDTO
\ No newline at end of file
+JVMMembershipMonitorDTO
+ZeroconfMembershipMonitorDTO
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala Mon Jul 16 18:55:29 2012
@@ -69,6 +69,7 @@ object NetworkManager extends Log {
             connector.socket_address match {
               case address:InetSocketAddress =>
                 rc = rc.replaceAllLiterally("{{connector."+id+".port}}", ""+address.getPort)
+              case _ =>
             }
           case _ =>
         }
@@ -121,6 +122,8 @@ class NetworkManager(broker: Broker) ext
       var monitor = MembershipMonitorFactory.create(broker, monitor_dto)
       if(monitor!=null) {
         monitors ::= monitor
+      } else {
+        warn("Could not create the membership monitor for: "+monitor_dto)
       }
     }
 
@@ -146,9 +149,11 @@ class NetworkManager(broker: Broker) ext
   def on_membership_change(value: collection.Set[ClusterMemberDTO]) = dispatch_queue {
     val (added, _, removed) = diff(members, value)
     for( m <- removed ) {
+      info("Broker host left the network: %s", m.id)
       load_monitor.remove(m)
     }
     for( m <- added ) {
+      info("Broker host joined the network: %s", m.id)
       load_monitor.add(m)
     }
     members = value

Added: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala?rev=1362190&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/ZeroconfMembershipMonitor.scala Mon Jul 16 18:55:29 2012
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.network
+
+import dto._
+import org.apache.activemq.jmdns._
+import java.io.IOException
+
+import java.net.InetAddress
+import org.apache.activemq.apollo.broker.Broker
+import org.apache.activemq.apollo.util.BaseService
+import org.fusesource.hawtdispatch._
+import org.apache.activemq.apollo.dto.JsonCodec
+import org.fusesource.hawtbuf.Buffer
+import collection.mutable
+import collection.mutable.ListBuffer
+import java.util.concurrent.TimeUnit
+import org.apache.activemq.apollo.util.CollectionsSupport._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ZeroconfMembershipMonitorFactory extends MembershipMonitorFactory.SPI {
+  def create(broker: Broker, dto: MembershipMonitorDTO): MembershipMonitor = dto match {
+    case dto:ZeroconfMembershipMonitorDTO=> new ZeroconfMembershipMonitor(broker, dto)
+    case _ => null
+  }
+}
+
+/**
+ * <p>
+ *   Uses Zeroconf to discover the cluster members.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ZeroconfMembershipMonitor(val broker: Broker, val dto:ZeroconfMembershipMonitorDTO) extends BaseService with MembershipMonitor {
+
+  def dispatch_queue: DispatchQueue = createQueue()
+  val members = mutable.HashMap[String, ClusterMemberDTO]()
+  var jmdns:JmDNS = _
+
+  var localservices = Set[ServiceInfo]()
+
+  protected def _start(on_completed: Task) = {
+
+    if (dto.group == null) {
+      throw new IOException("You must specify a group to discover")
+    }
+
+    jmdns = new JmDNS(if (dto.address != null) {
+      InetAddress.getByName(dto.address)
+    } else {
+      InetAddress.getLocalHost
+    })
+
+    Broker.BLOCKABLE_THREAD_POOL.future {
+      // Listen for service record events.
+      val kind = "_" + dto.group + ".apollo."
+      jmdns.addListener(new DNSListener {
+        def updateRecord(jmdns: JmDNS, now: Long, record: DNSRecord) {
+          if ( record.getName.endsWith(kind) ) {
+            record match {
+              case record:DNSRecord.Text =>
+                var member = JsonCodec.decode(new Buffer(record.text), classOf[ClusterMemberDTO])
+                if( record.isExpired(Broker.now) ) {
+                  members.remove(member.id)
+                } else {
+                  members.put(member.id, member)
+                }
+                if ( listener!=null ) {
+                  listener.on_membership_change(members.values.toSet)
+                }
+              case _ =>
+            }
+          }
+        }
+      }, null)
+
+    }.onComplete(_ => on_completed.run() )
+
+    schedule_reoccurring(1, TimeUnit.SECONDS) {
+
+      val next = local_services.toSet
+      val (added, _, removed) = diff(localservices, next)
+      localservices = next
+
+      if( !added.isEmpty || !removed.isEmpty) {
+        Broker.BLOCKABLE_THREAD_POOL.future {
+          for( s <- added ) {
+            jmdns.registerService(s)
+          }
+          for( s <- removed ) {
+            jmdns.unregisterService(s)
+          }
+        }
+      }
+    }
+  }
+
+
+  def local_services:Seq[ServiceInfo] = {
+    import collection.JavaConversions._
+    val kind = "_" + dto.group + ".apollo."
+    val rc = ListBuffer[ServiceInfo]()
+    broker.config.services.foreach(_ match {
+      case x:NetworkManagerDTO=>
+        if(x.self!=null ) {
+          // multiple hosts??
+          if(NetworkManager.has_variables(x.self.id)) {
+            for( host <- broker.virtual_hosts.values.toIterable ) {
+              var resolved = NetworkManager.resolve_variables(x.self, broker, host)
+              if( !NetworkManager.has_variables(resolved) ) {
+                rc += new ServiceInfo(kind, host.id, 0, 0, 0, JsonCodec.encode(resolved).toByteArray)
+              }
+            }
+          } else {
+            var resolved = NetworkManager.resolve_variables(x.self, broker, null)
+            if( !NetworkManager.has_variables(resolved) ) {
+              rc += new ServiceInfo(kind, x.self.id, 0, 0, 0, JsonCodec.encode(resolved).toByteArray)
+            }
+          }
+        }
+      case _ =>
+    })
+    rc
+  }
+
+  protected def _stop(on_completed: Task) = {
+    Broker.BLOCKABLE_THREAD_POOL.future {
+      jmdns.close()
+      jmdns = null;
+    }.onComplete(x => on_completed.run())
+  }
+}

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ClusterMemberDTO.java Mon Jul 16 18:55:29 2012
@@ -31,6 +31,9 @@ public class ClusterMemberDTO {
     @XmlAttribute(name="id")
     public String id;
 
+    @XmlAttribute(name="cluster")
+    public String cluster;
+
     @XmlElement(name="service")
     public ArrayList<ClusterServiceDTO> services = new ArrayList<ClusterServiceDTO>();
 
@@ -42,6 +45,7 @@ public class ClusterMemberDTO {
         ClusterMemberDTO that = (ClusterMemberDTO) o;
 
         if (id != null ? !id.equals(that.id) : that.id != null) return false;
+        if (cluster != null ? !cluster.equals(that.id) : that.cluster != null) return false;
         if (services != null ? !services.equals(that.services) : that.services != null)
             return false;
 
@@ -51,6 +55,7 @@ public class ClusterMemberDTO {
     @Override
     public int hashCode() {
         int result = id != null ? id.hashCode() : 0;
+        result = 31 * result + (cluster != null ? cluster.hashCode() : 0);
         result = 31 * result + (services != null ? services.hashCode() : 0);
         return result;
     }

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala?rev=1362190&r1=1362189&r2=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala Mon Jul 16 18:55:29 2012
@@ -26,7 +26,8 @@ class Module extends DtoModule {
   def dto_package = "org.apache.activemq.apollo.broker.network.dto"
   def extension_classes = Array(
     classOf[NetworkManagerDTO],
-    classOf[JVMMembershipMonitorDTO]
+    classOf[JVMMembershipMonitorDTO],
+    classOf[ZeroconfMembershipMonitorDTO]
   )
 
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java (from r1355565, activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java?p2=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java&p1=activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala&r1=1355565&r2=1362190&rev=1362190&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/Module.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/ZeroconfMembershipMonitorDTO.java Mon Jul 16 18:55:29 2012
@@ -1,5 +1,3 @@
-package org.apache.activemq.apollo.broker.network.dto
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,17 +14,26 @@ package org.apache.activemq.apollo.broke
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.activemq.apollo.util.DtoModule
+package org.apache.activemq.apollo.broker.network.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Module extends DtoModule {
+@XmlRootElement(name="zeroconf_membership")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ZeroconfMembershipMonitorDTO extends MembershipMonitorDTO {
 
-  def dto_package = "org.apache.activemq.apollo.broker.network.dto"
-  def extension_classes = Array(
-    classOf[NetworkManagerDTO],
-    classOf[JVMMembershipMonitorDTO]
-  )
+    @XmlAttribute
+    public String address;
+    @XmlAttribute
+    public String group;
 
-}
\ No newline at end of file
+}