You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/05/11 08:35:48 UTC

svn commit: r655260 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java kernel/conf/axis2.xml

Author: azeez
Date: Sat May 10 23:35:47 2008
New Revision: 655260

URL: http://svn.apache.org/viewvc?rev=655260&view=rev
Log:
Adding more control over cluster configuration


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=655260&r1=655259&r2=655260&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sat May 10 23:35:47 2008
@@ -49,17 +49,21 @@
 import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
 import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.membership.StaticMember;
 import org.apache.catalina.tribes.transport.MultiPointSender;
 import org.apache.catalina.tribes.transport.ReceiverBase;
 import org.apache.catalina.tribes.transport.ReplicationTransmitter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 
 public class TribesClusterManager implements ClusterManager {
     public static final int MSG_ORDER_OPTION = 512;
@@ -146,13 +150,6 @@
         MultiPointSender multiPointSender = replicationTransmitter.getTransport();
         multiPointSender.setMaxRetryAttempts(maxRetries);
 
-        // Set the IP address that will be advertised by this node
-        String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
-        if (localIP != null) {
-            ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
-            receiver.setAddress(localIP);
-        }
-
         // Set the domain for this Node
         Parameter domainParam = getParameter(ClusteringConstants.DOMAIN);
         byte[] domain;
@@ -162,46 +159,10 @@
             domain = "apache.axis2.domain".getBytes();
         }
 
-        // Add a DomainFilterInterceptor
-        channel.getMembershipService().setDomain(domain);
-        DomainFilterInterceptor dfi = new DomainFilterInterceptor();
-        dfi.setDomain(domain);
-        channel.addInterceptor(dfi);
-
-        // Add the NonBlockingCoordinator. This is used for leader election
-        /*nbc = new NonBlockingCoordinator() {
-            public void fireInterceptorEvent(InterceptorEvent event) {
-                String status = event.getEventTypeDesc();
-                System.err.println("$$$$$$$$$$$$ NBC status=" + status);
-                int type = event.getEventType();
-            }
-        };
-        nbc.setPrevious(dfi);
-        channel.addInterceptor(nbc);*/
-
-        /*Properties mcastProps = channel.getMembershipService().getProperties();
-       mcastProps.setProperty("mcastPort", "5555");
-       mcastProps.setProperty("mcastAddress", "224.10.10.10");
-       mcastProps.setProperty("mcastClusterDomain", "catalina");
-       mcastProps.setProperty("bindAddress", "localhost");
-       mcastProps.setProperty("memberDropTime", "20000");
-       mcastProps.setProperty("mcastFrequency", "500");
-       mcastProps.setProperty("tcpListenPort", "4000");
-       mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
-
-        // Add the OrderInterceptor to preserve sender ordering 
-        OrderInterceptor orderInterceptor = new OrderInterceptor();
-        orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
-        channel.addInterceptor(orderInterceptor);
-
-        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
-        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
-        channel.addInterceptor(atMostOnceInterceptor);
+        // Add all the ChannelInterceptors
+        addInterceptors(channel, domain);
 
-        // Add a reliable failure detector
-        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-        tcpFailureDetector.setPrevious(dfi);
-        channel.addInterceptor(tcpFailureDetector);
+        configureMulticastParameters(channel);
 
         channel.addChannelListener(channelListener);
 
@@ -250,6 +211,95 @@
                 setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
     }
 
+    private void addInterceptors(ManagedChannel channel, byte[] domain) {
+        // Add a DomainFilterInterceptor
+        channel.getMembershipService().setDomain(domain);
+        DomainFilterInterceptor dfi = new DomainFilterInterceptor();
+        dfi.setDomain(domain);
+        channel.addInterceptor(dfi);
+
+        // Add the NonBlockingCoordinator. This is used for leader election
+        /*nbc = new NonBlockingCoordinator() {
+            public void fireInterceptorEvent(InterceptorEvent event) {
+                String status = event.getEventTypeDesc();
+                System.err.println("$$$$$$$$$$$$ NBC status=" + status);
+                int type = event.getEventType();
+            }
+        };
+        nbc.setPrevious(dfi);
+        channel.addInterceptor(nbc);*/
+
+        // Add the OrderInterceptor to preserve sender ordering
+        OrderInterceptor orderInterceptor = new OrderInterceptor();
+        orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
+        channel.addInterceptor(orderInterceptor);
+
+        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+        channel.addInterceptor(atMostOnceInterceptor);
+
+        // Add a reliable failure detector
+        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+        tcpFailureDetector.setPrevious(dfi);
+        channel.addInterceptor(tcpFailureDetector);
+
+//        if(memberDiscoverMode = WKA){
+//            TcpPing
+//            TcpFailure
+//            StaticMembership
+//        }
+        /*StaticMembershipInterceptor staticMembershipInterceptor = new StaticMembershipInterceptor();
+        channel.addInterceptor(staticMembershipInterceptor);
+        try {
+            staticMembershipInterceptor.addStaticMember(new StaticMember("10.100.1.190", 4000, 10));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }*/
+    }
+
+    private void configureMulticastParameters(ManagedChannel channel) {
+        Properties mcastProps = channel.getMembershipService().getProperties();
+        Parameter mcastAddress = getParameter("multicastAddress");
+        if (mcastAddress != null) {
+            mcastProps.setProperty("mcastAddress", ((String) mcastAddress.getValue()).trim());
+        }
+        Parameter mcastPort = getParameter("multicastPort");
+        if (mcastPort != null) {
+            mcastProps.setProperty("mcastPort", ((String) mcastPort.getValue()).trim());
+        }
+        Parameter mcastFrequency = getParameter("multicastFrequency");
+        if (mcastFrequency != null) {
+            mcastProps.setProperty("mcastFrequency", ((String) mcastFrequency.getValue()).trim());
+        }
+        Parameter mcastMemberDropTime = getParameter("multicastMemberDropTime");
+        if (mcastMemberDropTime != null) {
+            mcastProps.setProperty("memberDropTime", ((String) mcastMemberDropTime.getValue()).trim());
+        }
+
+        // Set the IP address that will be advertised by this node
+        ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
+        Parameter tcpListenHost = getParameter("tcpListenHost");
+        if(tcpListenHost != null){
+            String host = ((String) tcpListenHost.getValue()).trim();
+            mcastProps.setProperty("tcpListenHost", host);
+            mcastProps.setProperty("bindAddress", host);
+            receiver.setAddress(host);
+        }
+        String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
+        if (localIP != null) {
+            receiver.setAddress(localIP);
+        }
+
+        Parameter tcpListenPort = getParameter("tcpListenPort");
+        if(tcpListenPort != null){
+            String port = ((String) tcpListenPort.getValue()).trim();
+            mcastProps.setProperty("tcpListenPort", port);
+            receiver.setPort(Integer.parseInt(port));
+        }
+
+        /*mcastProps.setProperty("mcastClusterDomain", "catalina");*/
+    }
+
     /**
      * Get some information from a neighbour. This information will be used by this node to
      * initialize itself
@@ -269,7 +319,9 @@
         List sentMembersList = new ArrayList();
         sentMembersList.add(TribesUtil.getLocalHost(channel));
         Member[] members = membershipManager.getMembers();
-        if(members.length == 0) return;
+        if (members.length == 0) {
+            return;
+        }
 
         while (members.length > 0 && numberOfTries < 5) {
             Member member = (numberOfTries == 0) ?
@@ -301,7 +353,7 @@
             }
             numberOfTries++;
             members = membershipManager.getMembers();
-            if(numberOfTries >= members.length){
+            if (numberOfTries >= members.length) {
                 break;
             }
         }

Modified: webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml?rev=655260&r1=655259&r2=655260&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml (original)
+++ webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml Sat May 10 23:35:47 2008
@@ -250,6 +250,15 @@
         <parameter name="domain">apache.axis2.domain</parameter>
         <parameter name="synchronizeAll">true</parameter>
         <parameter name="maxRetries">10</parameter>
+
+        <parameter name="multicastAddress">228.0.0.4</parameter>
+        <parameter name="multicastPort">45564</parameter>
+        <parameter name="multicastFrequency">500</parameter>
+        <parameter name="multicastMemberDropTime">3000</parameter>
+
+        <parameter name="tcpListenHost">127.0.0.1</parameter>
+        <parameter name="tcpListenPort">4000</parameter>
+
     	<configurationManager class="org.apache.axis2.cluster.configuration.TribesConfigurationManager">
     	    <listener class="org.apache.axis2.cluster.configuration.DefaultConfigurationManagerListener"/>
     	</configurationManager>



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org