You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/10/28 13:18:35 UTC

svn commit: r708551 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/tribes/ kernel/conf/ kernel/src/org/apache/axis2/clustering/

Author: azeez
Date: Tue Oct 28 05:18:34 2008
New Revision: 708551

URL: http://svn.apache.org/viewvc?rev=708551&view=rev
Log:
Providing params to enable/disable atmost once message processing & preserving sender order


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
    webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java Tue Oct 28 05:18:34 2008
@@ -58,18 +58,22 @@
      */
     private OperationMode mode;
 
-    private MembershipListener membershipListener;
+    //    private MembershipListener membershipListener;
+    private boolean atmostOnceMessageSemantics;
+    private boolean preserverMsgOrder;
 
     public MulticastBasedMembershipScheme(ManagedChannel channel,
                                           OperationMode mode,
                                           Map<String, Parameter> parameters,
                                           byte[] domain,
-                                          MembershipListener membershipListener) {
+                                          boolean atmostOnceMessageSemantics,
+                                          boolean preserverMsgOrder) {
         this.channel = channel;
         this.mode = mode;
         this.parameters = parameters;
         this.domain = domain;
-        this.membershipListener = membershipListener;
+        this.atmostOnceMessageSemantics = atmostOnceMessageSemantics;
+        this.preserverMsgOrder = preserverMsgOrder;
     }
 
     public void init() throws ClusteringFault {
@@ -165,25 +169,29 @@
         }
 
         // Add the NonBlockingCoordinator.
-        channel.addInterceptor(new Axis2Coordinator(membershipListener));
+//        channel.addInterceptor(new Axis2Coordinator(membershipListener));
 
         channel.getMembershipService().setDomain(domain);
         mode.addInterceptors(channel);
 
-        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
-        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
-        atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
-        channel.addInterceptor(atMostOnceInterceptor);
-        if (log.isDebugEnabled()) {
-            log.debug("Added At-most-once Interceptor");
+        if (atmostOnceMessageSemantics) {
+            // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+            AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+            atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
+            channel.addInterceptor(atMostOnceInterceptor);
+            if (log.isDebugEnabled()) {
+                log.debug("Added At-most-once Interceptor");
+            }
         }
 
-        // Add the OrderInterceptor to preserve sender ordering
-        OrderInterceptor orderInterceptor = new OrderInterceptor();
-        orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
-        channel.addInterceptor(orderInterceptor);
-        if (log.isDebugEnabled()) {
-            log.debug("Added Message Order Interceptor");
+        if (preserverMsgOrder) {
+            // Add the OrderInterceptor to preserve sender ordering
+            OrderInterceptor orderInterceptor = new OrderInterceptor();
+            orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
+            channel.addInterceptor(orderInterceptor);
+            if (log.isDebugEnabled()) {
+                log.debug("Added Message Order Interceptor");
+            }
         }
     }
 

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Tue Oct 28 05:18:34 2008
@@ -19,16 +19,16 @@
 
 package org.apache.axis2.clustering.tribes;
 
-import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusterManager;
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.LoadBalanceEventHandler;
+import org.apache.axis2.clustering.MembershipListener;
 import org.apache.axis2.clustering.MembershipScheme;
 import org.apache.axis2.clustering.RequestBlockingHandler;
-import org.apache.axis2.clustering.MembershipListener;
 import org.apache.axis2.clustering.configuration.ConfigurationManager;
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
 import org.apache.axis2.clustering.context.ClusteringContextListener;
@@ -60,14 +60,13 @@
 import javax.xml.namespace.QName;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Iterator;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
 
 /**
  * The main ClusterManager class for the Tribes based clustering implementation
@@ -231,7 +230,6 @@
         }
         Parameter isActiveParam = getParameter(ClusteringConstants.Parameters.IS_ACTIVE);
         if (isActiveParam != null) {
-            System.out.println("##### isActive=" + isActiveParam.getValue());
             memberInfo.setProperty(ClusteringConstants.Parameters.IS_ACTIVE,
                                    (String) isActiveParam.getValue());
         }
@@ -401,14 +399,19 @@
         String scheme = getMembershipScheme();
         log.info("Using " + scheme + " based membership management scheme");
         if (scheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
-            membershipScheme = new WkaBasedMembershipScheme(channel, mode,
-                                                            membershipManagers,
-                                                            primaryMembershipManager,
-                                                            parameters, localDomain, members,
-                                                            membershipListener);
+            membershipScheme =
+                    new WkaBasedMembershipScheme(channel, mode,
+                                                 membershipManagers,
+                                                 primaryMembershipManager,
+                                                 parameters, localDomain, members,
+                                                 getBooleanParam(ClusteringConstants.Parameters.ATMOST_ONCE_MSG_SEMANTICS),
+                                                 getBooleanParam(ClusteringConstants.Parameters.PRESERVE_MSG_ORDER));
         } else if (scheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
-            membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters,
-                                                                  localDomain, membershipListener);
+            membershipScheme =
+                    new MulticastBasedMembershipScheme(channel, mode, parameters,
+                                                       localDomain,
+                                                       getBooleanParam(ClusteringConstants.Parameters.ATMOST_ONCE_MSG_SEMANTICS),
+                                                       getBooleanParam(ClusteringConstants.Parameters.PRESERVE_MSG_ORDER));
         } else {
             String msg = "Invalid membership scheme '" + scheme +
                          "'. Supported schemes are multicast & wka";
@@ -418,15 +421,28 @@
         membershipScheme.init();
     }
 
+    private boolean getBooleanParam(String name) {
+        boolean result = false;
+        Parameter parameter = getParameter(name);
+        if (parameter != null) {
+            Object value = parameter.getValue();
+            if (value != null) {
+                result = Boolean.valueOf(((String) value).trim());
+            }
+        }
+        return result;
+    }
+
     /**
      * Find and invoke the setter method with the name of form setXXX passing in the value given
      * on the POJO object
+     *
      * @param name name of the setter field
-     * @param val value to be set
-     * @param obj POJO instance
+     * @param val  value to be set
+     * @param obj  POJO instance
      * @throws ClusteringFault If an error occurs while setting the property
      */
-    public void setInstanceProperty(String name, Object val, Object obj) throws ClusteringFault {
+    private void setInstanceProperty(String name, Object val, Object obj) throws ClusteringFault {
 
         String mName = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
         Method method;
@@ -477,14 +493,14 @@
 
             if (!invoked) {
                 handleException("Did not find a setter method named : " + mName +
-                    "() that takes a single String, int, long, float, double " +
-                    "or boolean parameter");
+                                "() that takes a single String, int, long, float, double " +
+                                "or boolean parameter");
             }
 
         } catch (Exception e) {
             handleException("Error invoking setter method named : " + mName +
-                "() that takes a single String, int, long, float, double " +
-                "or boolean parameter", e);
+                            "() that takes a single String, int, long, float, double " +
+                            "or boolean parameter", e);
         }
     }
 

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Tue Oct 28 05:18:34 2008
@@ -26,14 +26,12 @@
 import org.apache.axis2.util.Utils;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.ChannelInterceptor;
 import org.apache.catalina.tribes.group.Response;
 import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
 import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
 import org.apache.catalina.tribes.membership.StaticMember;
 import org.apache.catalina.tribes.transport.ReceiverBase;
 import org.apache.commons.logging.Log;
@@ -82,7 +80,8 @@
      */
     private OperationMode mode;
 
-    private MembershipListener membershipListener;
+    private boolean atmostOnceMessageSemantics;
+    private boolean preserverMsgOrder;
 
     public WkaBasedMembershipScheme(ManagedChannel channel,
                                     OperationMode mode,
@@ -91,7 +90,8 @@
                                     Map<String, Parameter> parameters,
                                     byte[] domain,
                                     List<Member> members,
-                                    MembershipListener membershipListener) {
+                                    boolean atmostOnceMessageSemantics,
+                                    boolean preserverMsgOrder) {
         this.channel = channel;
         this.mode = mode;
         this.applicationDomainMembershipManagers = applicationDomainMembershipManagers;
@@ -99,7 +99,8 @@
         this.parameters = parameters;
         this.localDomain = domain;
         this.members = members;
-        this.membershipListener = membershipListener;
+        this.atmostOnceMessageSemantics = atmostOnceMessageSemantics;
+        this.preserverMsgOrder = preserverMsgOrder;
     }
 
     /**
@@ -298,8 +299,8 @@
         // Add a reliable failure detector
         TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
 //        tcpFailureDetector.setPrevious(dfi); //TODO: check this
-//        tcpFailureDetector.setReadTestTimeout(30000);
-        tcpFailureDetector.setConnectTimeout(30000);
+        tcpFailureDetector.setReadTestTimeout(120000);
+        tcpFailureDetector.setConnectTimeout(60000);
         channel.addInterceptor(tcpFailureDetector);
         if (log.isDebugEnabled()) {
             log.debug("Added TCP Failure Detector");
@@ -307,7 +308,7 @@
 
         // Add the NonBlockingCoordinator.
 //        channel.addInterceptor(new Axis2Coordinator(membershipListener));
-        
+
         staticMembershipInterceptor = new StaticMembershipInterceptor();
         staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
         primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
@@ -319,20 +320,24 @@
         channel.getMembershipService().setDomain(localDomain);
         mode.addInterceptors(channel);
 
-        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
-        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
-        atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
-        channel.addInterceptor(atMostOnceInterceptor);
-        if (log.isDebugEnabled()) {
-            log.debug("Added At-most-once Interceptor");
+        if (atmostOnceMessageSemantics) {
+            // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+            AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+            atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
+            channel.addInterceptor(atMostOnceInterceptor);
+            if (log.isDebugEnabled()) {
+                log.debug("Added At-most-once Interceptor");
+            }
         }
 
-        // Add the OrderInterceptor to preserve sender ordering
-        OrderInterceptor orderInterceptor = new OrderInterceptor();
-        orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
-        channel.addInterceptor(orderInterceptor);
-        if (log.isDebugEnabled()) {
-            log.debug("Added Message Order Interceptor");
+        if (preserverMsgOrder) {
+            // Add the OrderInterceptor to preserve sender ordering
+            OrderInterceptor orderInterceptor = new OrderInterceptor();
+            orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
+            channel.addInterceptor(orderInterceptor);
+            if (log.isDebugEnabled()) {
+                log.debug("Added Message Order Interceptor");
+            }
         }
     }
 
@@ -366,7 +371,7 @@
                 new RpcChannel(TribesUtil.getRpcMembershipChannelId(localDomain),
                                channel, new RpcMembershipRequestHandler(primaryMembershipManager,
                                                                         this));
-        if(log.isDebugEnabled()){
+        if (log.isDebugEnabled()) {
             log.debug("Created primary membership channel " + new String(localDomain));
         }
         primaryMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);

Modified: webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml (original)
+++ webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml Tue Oct 28 05:18:34 2008
@@ -359,6 +359,16 @@
         <parameter name="localMemberPort">4000</parameter>
 
         <!--
+        Preserve message ordering. This will be done according to sender order.
+        -->
+        <parameter name="preserveMessageOrder">true</parameter>
+
+        <!--
+        Maintain atmost-once message processing semantics
+        -->
+        <parameter name="atmostOnceMessageSemantics">true</parameter>
+
+        <!--
            The list of static or well-known members. These entries will only be valid if the
            "membershipScheme" above is set to "wka"
         -->

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Tue Oct 28 05:18:34 2008
@@ -100,6 +100,16 @@
         public static final String AVOID_INITIATION = "AvoidInitiation";
 
         /**
+         * Preserve message ordering. This will be done according to sender order
+         */
+        public static final String PRESERVE_MSG_ORDER = "preserveMessageOrder";
+
+        /**
+         * Maintain atmost-once message processing semantics
+         */
+        public static final String ATMOST_ONCE_MSG_SEMANTICS = "atmostOnceMessageSemantics";
+
+        /**
          * Indicates whether this member is ACTIVE or PASSIVE
          */
         public static final String IS_ACTIVE = "isActive";