You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/10/28 13:18:35 UTC
svn commit: r708551 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/tribes/ kernel/conf/
kernel/src/org/apache/axis2/clustering/
Author: azeez
Date: Tue Oct 28 05:18:34 2008
New Revision: 708551
URL: http://svn.apache.org/viewvc?rev=708551&view=rev
Log:
Providing params to enable/disable atmost once message processing & preserving sender order
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java Tue Oct 28 05:18:34 2008
@@ -58,18 +58,22 @@
*/
private OperationMode mode;
- private MembershipListener membershipListener;
+ // private MembershipListener membershipListener;
+ private boolean atmostOnceMessageSemantics;
+ private boolean preserverMsgOrder;
public MulticastBasedMembershipScheme(ManagedChannel channel,
OperationMode mode,
Map<String, Parameter> parameters,
byte[] domain,
- MembershipListener membershipListener) {
+ boolean atmostOnceMessageSemantics,
+ boolean preserverMsgOrder) {
this.channel = channel;
this.mode = mode;
this.parameters = parameters;
this.domain = domain;
- this.membershipListener = membershipListener;
+ this.atmostOnceMessageSemantics = atmostOnceMessageSemantics;
+ this.preserverMsgOrder = preserverMsgOrder;
}
public void init() throws ClusteringFault {
@@ -165,25 +169,29 @@
}
// Add the NonBlockingCoordinator.
- channel.addInterceptor(new Axis2Coordinator(membershipListener));
+// channel.addInterceptor(new Axis2Coordinator(membershipListener));
channel.getMembershipService().setDomain(domain);
mode.addInterceptors(channel);
- // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
- AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
- atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
- channel.addInterceptor(atMostOnceInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added At-most-once Interceptor");
+ if (atmostOnceMessageSemantics) {
+ // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+ atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
+ channel.addInterceptor(atMostOnceInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added At-most-once Interceptor");
+ }
}
- // Add the OrderInterceptor to preserve sender ordering
- OrderInterceptor orderInterceptor = new OrderInterceptor();
- orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
- channel.addInterceptor(orderInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added Message Order Interceptor");
+ if (preserverMsgOrder) {
+ // Add the OrderInterceptor to preserve sender ordering
+ OrderInterceptor orderInterceptor = new OrderInterceptor();
+ orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
+ channel.addInterceptor(orderInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Message Order Interceptor");
+ }
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Tue Oct 28 05:18:34 2008
@@ -19,16 +19,16 @@
package org.apache.axis2.clustering.tribes;
-import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusterManager;
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.LoadBalanceEventHandler;
+import org.apache.axis2.clustering.MembershipListener;
import org.apache.axis2.clustering.MembershipScheme;
import org.apache.axis2.clustering.RequestBlockingHandler;
-import org.apache.axis2.clustering.MembershipListener;
import org.apache.axis2.clustering.configuration.ConfigurationManager;
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
import org.apache.axis2.clustering.context.ClusteringContextListener;
@@ -60,14 +60,13 @@
import javax.xml.namespace.QName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Iterator;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
/**
* The main ClusterManager class for the Tribes based clustering implementation
@@ -231,7 +230,6 @@
}
Parameter isActiveParam = getParameter(ClusteringConstants.Parameters.IS_ACTIVE);
if (isActiveParam != null) {
- System.out.println("##### isActive=" + isActiveParam.getValue());
memberInfo.setProperty(ClusteringConstants.Parameters.IS_ACTIVE,
(String) isActiveParam.getValue());
}
@@ -401,14 +399,19 @@
String scheme = getMembershipScheme();
log.info("Using " + scheme + " based membership management scheme");
if (scheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
- membershipScheme = new WkaBasedMembershipScheme(channel, mode,
- membershipManagers,
- primaryMembershipManager,
- parameters, localDomain, members,
- membershipListener);
+ membershipScheme =
+ new WkaBasedMembershipScheme(channel, mode,
+ membershipManagers,
+ primaryMembershipManager,
+ parameters, localDomain, members,
+ getBooleanParam(ClusteringConstants.Parameters.ATMOST_ONCE_MSG_SEMANTICS),
+ getBooleanParam(ClusteringConstants.Parameters.PRESERVE_MSG_ORDER));
} else if (scheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
- membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters,
- localDomain, membershipListener);
+ membershipScheme =
+ new MulticastBasedMembershipScheme(channel, mode, parameters,
+ localDomain,
+ getBooleanParam(ClusteringConstants.Parameters.ATMOST_ONCE_MSG_SEMANTICS),
+ getBooleanParam(ClusteringConstants.Parameters.PRESERVE_MSG_ORDER));
} else {
String msg = "Invalid membership scheme '" + scheme +
"'. Supported schemes are multicast & wka";
@@ -418,15 +421,28 @@
membershipScheme.init();
}
+ private boolean getBooleanParam(String name) {
+ boolean result = false;
+ Parameter parameter = getParameter(name);
+ if (parameter != null) {
+ Object value = parameter.getValue();
+ if (value != null) {
+ result = Boolean.valueOf(((String) value).trim());
+ }
+ }
+ return result;
+ }
+
/**
* Find and invoke the setter method with the name of form setXXX passing in the value given
* on the POJO object
+ *
* @param name name of the setter field
- * @param val value to be set
- * @param obj POJO instance
+ * @param val value to be set
+ * @param obj POJO instance
* @throws ClusteringFault If an error occurs while setting the property
*/
- public void setInstanceProperty(String name, Object val, Object obj) throws ClusteringFault {
+ private void setInstanceProperty(String name, Object val, Object obj) throws ClusteringFault {
String mName = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
Method method;
@@ -477,14 +493,14 @@
if (!invoked) {
handleException("Did not find a setter method named : " + mName +
- "() that takes a single String, int, long, float, double " +
- "or boolean parameter");
+ "() that takes a single String, int, long, float, double " +
+ "or boolean parameter");
}
} catch (Exception e) {
handleException("Error invoking setter method named : " + mName +
- "() that takes a single String, int, long, float, double " +
- "or boolean parameter", e);
+ "() that takes a single String, int, long, float, double " +
+ "or boolean parameter", e);
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Tue Oct 28 05:18:34 2008
@@ -26,14 +26,12 @@
import org.apache.axis2.util.Utils;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
import org.apache.catalina.tribes.membership.StaticMember;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.commons.logging.Log;
@@ -82,7 +80,8 @@
*/
private OperationMode mode;
- private MembershipListener membershipListener;
+ private boolean atmostOnceMessageSemantics;
+ private boolean preserverMsgOrder;
public WkaBasedMembershipScheme(ManagedChannel channel,
OperationMode mode,
@@ -91,7 +90,8 @@
Map<String, Parameter> parameters,
byte[] domain,
List<Member> members,
- MembershipListener membershipListener) {
+ boolean atmostOnceMessageSemantics,
+ boolean preserverMsgOrder) {
this.channel = channel;
this.mode = mode;
this.applicationDomainMembershipManagers = applicationDomainMembershipManagers;
@@ -99,7 +99,8 @@
this.parameters = parameters;
this.localDomain = domain;
this.members = members;
- this.membershipListener = membershipListener;
+ this.atmostOnceMessageSemantics = atmostOnceMessageSemantics;
+ this.preserverMsgOrder = preserverMsgOrder;
}
/**
@@ -298,8 +299,8 @@
// Add a reliable failure detector
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
// tcpFailureDetector.setPrevious(dfi); //TODO: check this
-// tcpFailureDetector.setReadTestTimeout(30000);
- tcpFailureDetector.setConnectTimeout(30000);
+ tcpFailureDetector.setReadTestTimeout(120000);
+ tcpFailureDetector.setConnectTimeout(60000);
channel.addInterceptor(tcpFailureDetector);
if (log.isDebugEnabled()) {
log.debug("Added TCP Failure Detector");
@@ -307,7 +308,7 @@
// Add the NonBlockingCoordinator.
// channel.addInterceptor(new Axis2Coordinator(membershipListener));
-
+
staticMembershipInterceptor = new StaticMembershipInterceptor();
staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
@@ -319,20 +320,24 @@
channel.getMembershipService().setDomain(localDomain);
mode.addInterceptors(channel);
- // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
- AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
- atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
- channel.addInterceptor(atMostOnceInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added At-most-once Interceptor");
+ if (atmostOnceMessageSemantics) {
+ // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+ atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
+ channel.addInterceptor(atMostOnceInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added At-most-once Interceptor");
+ }
}
- // Add the OrderInterceptor to preserve sender ordering
- OrderInterceptor orderInterceptor = new OrderInterceptor();
- orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
- channel.addInterceptor(orderInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added Message Order Interceptor");
+ if (preserverMsgOrder) {
+ // Add the OrderInterceptor to preserve sender ordering
+ OrderInterceptor orderInterceptor = new OrderInterceptor();
+ orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
+ channel.addInterceptor(orderInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Message Order Interceptor");
+ }
}
}
@@ -366,7 +371,7 @@
new RpcChannel(TribesUtil.getRpcMembershipChannelId(localDomain),
channel, new RpcMembershipRequestHandler(primaryMembershipManager,
this));
- if(log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Created primary membership channel " + new String(localDomain));
}
primaryMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
Modified: webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml (original)
+++ webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml Tue Oct 28 05:18:34 2008
@@ -359,6 +359,16 @@
<parameter name="localMemberPort">4000</parameter>
<!--
+ Preserve message ordering. This will be done according to sender order.
+ -->
+ <parameter name="preserveMessageOrder">true</parameter>
+
+ <!--
+ Maintain atmost-once message processing semantics
+ -->
+ <parameter name="atmostOnceMessageSemantics">true</parameter>
+
+ <!--
The list of static or well-known members. These entries will only be valid if the
"membershipScheme" above is set to "wka"
-->
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?rev=708551&r1=708550&r2=708551&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Tue Oct 28 05:18:34 2008
@@ -100,6 +100,16 @@
public static final String AVOID_INITIATION = "AvoidInitiation";
/**
+ * Preserve message ordering. This will be done according to sender order
+ */
+ public static final String PRESERVE_MSG_ORDER = "preserveMessageOrder";
+
+ /**
+ * Maintain atmost-once message processing semantics
+ */
+ public static final String ATMOST_ONCE_MSG_SEMANTICS = "atmostOnceMessageSemantics";
+
+ /**
* Indicates whether this member is ACTIVE or PASSIVE
*/
public static final String IS_ACTIVE = "isActive";