You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/25 18:42:20 UTC
svn commit: r671600 [1/2] - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering:
./ control/wka/ tribes/
Author: azeez
Date: Wed Jun 25 09:42:19 2008
New Revision: 671600
URL: http://svn.apache.org/viewvc?rev=671600&view=rev
Log:
Added load balancing with WKA based membership discovery. In the process of adding this new functionality, I had to carry out major refactoring of the code.
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java Wed Jun 25 09:42:19 2008
@@ -82,7 +82,7 @@
*/
private boolean canConnect(Member member) {
if(log.isDebugEnabled()){
- log.debug("Trying to connect to member " + member.getHostName() + "...");
+ log.debug("Trying to connect to member " + member + "...");
}
for (int retries = 30; retries > 0; retries--) {
try {
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering;
+
+/**
+ * A representation of a membership scheme such as "multicast based" or "well-known address (WKA)
+ * based" schemes. This is directly related to the membership discovery mechanism.
+ */
+public interface MembershipScheme {
+
+ /**
+ * Initialize this membership scheme
+ *
+ * @throws ClusteringFault If an error occurs while initializing
+ */
+ void init() throws ClusteringFault;
+
+ /**
+ * JOIN the group
+ *
+ * @throws ClusteringFault If an error occurs while joining the group
+ */
+ void joinGroup() throws ClusteringFault;
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Wed Jun 25 09:42:19 2008
@@ -20,7 +20,6 @@
import org.apache.axis2.clustering.tribes.MembershipManager;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import java.util.Arrays;
@@ -33,27 +32,20 @@
private Member member;
private MembershipManager membershipManager;
- private StaticMembershipInterceptor staticMembershipInterceptor;
public void setMembershipManager(MembershipManager membershipManager) {
this.membershipManager = membershipManager;
}
- public void setStaticMembershipInterceptor(
- StaticMembershipInterceptor staticMembershipInterceptor) {
- this.staticMembershipInterceptor = staticMembershipInterceptor;
- }
-
public void setMember(Member member) {
this.member = member;
}
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
Member localMember = membershipManager.getLocalMember();
- if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
+ if (localMember == null || !(Arrays.equals(localMember.getHost(), member.getHost()) &&
localMember.getPort() == member.getPort())) {
membershipManager.memberAdded(member);
- staticMembershipInterceptor.memberAdded(member);
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Wed Jun 25 09:42:19 2008
@@ -21,7 +21,6 @@
import org.apache.axis2.clustering.tribes.TribesUtil;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,33 +36,22 @@
private static final Log log = LogFactory.getLog(MemberListCommand.class);
private Member[] members;
- private Member sender;
private MembershipManager membershipManager;
- private StaticMembershipInterceptor staticMembershipInterceptor;
public void setMembershipManager(MembershipManager membershipManager) {
this.membershipManager = membershipManager;
}
- public void setStaticMembershipInterceptor(StaticMembershipInterceptor staticMembershipInterceptor) {
- this.staticMembershipInterceptor = staticMembershipInterceptor;
- }
-
public void setMembers(Member[] members) {
this.members = members;
}
- public void setSender(Member sender) {
- this.sender = sender;
- }
-
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
log.info("Received MEMBER_LIST message");
Member localMember = membershipManager.getLocalMember();
for (Member member : members) {
addMember(localMember, member);
}
- addMember(localMember, sender);
}
private void addMember(Member localMember, Member member) {
@@ -72,7 +60,6 @@
localMember.getPort() == member.getPort())) {
log.info("Added member " + TribesUtil.getName(member));
membershipManager.memberAdded(member);
- staticMembershipInterceptor.memberAdded(member);
}
}
}
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Represents a member running in application mode
+ */
+public class ApplicationMode implements Mode {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerMode.class);
+
+ private byte[] loadBalancerDomain;
+
+ public ApplicationMode(byte[] loadBalancerDomain) {
+ this.loadBalancerDomain = loadBalancerDomain;
+ }
+
+ public void addInterceptors(Channel channel) {
+ DomainFilterInterceptor dfi = new DomainFilterInterceptor();
+ dfi.setOptionFlag(TribesConstants.MEMBERSHIP_MSG_OPTION);
+ dfi.setDomain(loadBalancerDomain);
+ channel.addInterceptor(dfi);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Domain Filter Interceptor");
+ }
+ }
+
+ public void init(Channel channel) {
+ // Nothing to be done
+ }
+
+ public List<MembershipManager> getMembershipManagers() {
+ return new ArrayList<MembershipManager>();
+ }
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java Wed Jun 25 09:42:19 2008
@@ -15,20 +15,11 @@
*/
package org.apache.axis2.clustering.tribes;
-import org.apache.axis2.clustering.LoadBalanceEventHandler;
import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.membership.Membership;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.util.Arrays;
-import java.util.Map;
-import java.util.Properties;
/**
* This interceptor is used when this member is part of a load balancer cluster.
@@ -36,7 +27,6 @@
* another group.
*/
public class LoadBalancerInterceptor extends ChannelInterceptorBase {
- private static final Log log = LogFactory.getLog(LoadBalancerInterceptor.class);
/**
* Represents the load balancer group
@@ -48,146 +38,15 @@
*/
protected byte[] loadBalancerDomain = new byte[0];
- /**
- * Represents the group in which the applications being load balanced, are deployed and their
- * respective load balancer event handlers
- */
- private Map<String, LoadBalanceEventHandler> lbEventHandlers;
-
- public LoadBalancerInterceptor(byte[] loadBalancerDomain,
- Map<String, LoadBalanceEventHandler> lbEventHandlers) {
+ public LoadBalancerInterceptor(byte[] loadBalancerDomain) {
this.loadBalancerDomain = loadBalancerDomain;
- this.lbEventHandlers = lbEventHandlers;
- }
-
- public void setLbEventHandlers(Map<String, LoadBalanceEventHandler> lbEventHandlers) {
- this.lbEventHandlers = lbEventHandlers;
}
public void messageReceived(ChannelMessage msg) {
-
// Ignore all messages which are not intended for the load balancer group
- if (Arrays.equals(msg.getAddress().getDomain(), loadBalancerDomain)) {
+ if (okToProcess(msg.getOptions()) ||
+ Arrays.equals(msg.getAddress().getDomain(), loadBalancerDomain)) {
super.messageReceived(msg);
}
}
-
- public void memberAdded(Member member) {
- if (loadBalancerMembership == null) {
- setupMembership();
- }
- boolean notify;
- synchronized (loadBalancerMembership) {
- notify = Arrays.equals(loadBalancerDomain, member.getDomain());
- if (notify) {
- notify = loadBalancerMembership.memberAlive((MemberImpl) member);
- }
- }
- if (notify) {
- super.memberAdded(member);
-
- }
-
- // Is this an application domain member?
- for (String applicationDomain : lbEventHandlers.keySet()) {
- if (Arrays.equals(applicationDomain.getBytes(), member.getDomain())) {
- log.info("Application member " + TribesUtil.getName(member) + " joined group " +
- applicationDomain);
- LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
- if (eventHandler != null) {
- eventHandler.applicationMemberAdded(toAxis2Member(member));
- }
- break;
- }
- }
- }
-
- private org.apache.axis2.clustering.Member toAxis2Member(Member member) {
- org.apache.axis2.clustering.Member axis2Member =
- new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
- member.getPort());
- Properties props = getProperties(member.getPayload());
-
- String http = props.getProperty("HTTP");
- if (http != null && http.trim().length() != 0) {
- axis2Member.setHttpPort(Integer.parseInt(http));
- }
-
- String https = props.getProperty("HTTPS");
- if (https != null && https.trim().length() != 0) {
- axis2Member.setHttpsPort(Integer.parseInt(https));
- }
-
- return axis2Member;
- }
-
- private Properties getProperties(byte[] payload) {
- Properties props = null;
- try {
- ByteArrayInputStream bin = new ByteArrayInputStream(payload);
- props = new Properties();
- props.load(bin);
- } catch (IOException ignored) {
- // This error will never occur
- }
- return props;
- }
-
- public void memberDisappeared(Member member) {
- if (loadBalancerMembership == null) {
- setupMembership();
- }
- boolean notify;
- synchronized (loadBalancerMembership) {
- notify = Arrays.equals(loadBalancerDomain, member.getDomain());
- loadBalancerMembership.removeMember((MemberImpl) member);
- }
- if (notify) {
- super.memberDisappeared(member);
- }
-
- // Is this an application domain member?
- for (String applicationDomain : lbEventHandlers.keySet()) {
- if (Arrays.equals(applicationDomain.getBytes(), member.getDomain())) {
- log.info("Application member " + TribesUtil.getName(member) + " left group " +
- applicationDomain);
- LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
- if (eventHandler != null) {
- eventHandler.applicationMemberRemoved(toAxis2Member(member));
- }
- break;
- }
- }
- }
-
- public boolean hasMembers() {
- if (loadBalancerMembership == null) {
- setupMembership();
- }
- return loadBalancerMembership.hasMembers();
- }
-
- public Member[] getMembers() {
- if (loadBalancerMembership == null) {
- setupMembership();
- }
- return loadBalancerMembership.getMembers();
- }
-
- public Member getMember(Member mbr) {
- if (loadBalancerMembership == null) {
- setupMembership();
- }
- return loadBalancerMembership.getMember(mbr);
- }
-
- public Member getLocalMember(boolean incAlive) {
- return super.getLocalMember(incAlive);
- }
-
- protected synchronized void setupMembership() {
- if (loadBalancerMembership == null) {
- loadBalancerMembership = new Membership((MemberImpl) super.getLocalMember(true));
- }
- }
}
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.LoadBalanceEventHandler;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents a member running in load balance mode
+ */
+public class LoadBalancerMode implements Mode {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerMode.class);
+
+ private byte[] loadBalancerDomain;
+ private Map<String, LoadBalanceEventHandler> lbEventHandlers;
+ private List<MembershipManager> membershipManagers = new ArrayList<MembershipManager>();
+
+ public LoadBalancerMode(byte[] loadBalancerDomain,
+ Map<String, LoadBalanceEventHandler> lbEventHandlers) {
+ this.loadBalancerDomain = loadBalancerDomain;
+ this.lbEventHandlers = lbEventHandlers;
+ }
+
+ public void addInterceptors(Channel channel) {
+ LoadBalancerInterceptor lbInterceptor =
+ new LoadBalancerInterceptor(loadBalancerDomain);
+ lbInterceptor.setOptionFlag(TribesConstants.MEMBERSHIP_MSG_OPTION);
+ channel.addInterceptor(lbInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Load Balancer Interceptor");
+ }
+ }
+
+ public void init(Channel channel) {
+ // Have multiple RPC channels with multiple RPC request handlers for each domain
+ // This is needed only when this member is running as a load balancer
+ for (Object o : lbEventHandlers.keySet()) {
+ String domain = (String) o;
+ final MembershipManager membershipManager = new MembershipManager();
+ membershipManager.setDomain(domain.getBytes());
+ membershipManager.setLoadBalanceEventHandler(lbEventHandlers.get(domain));
+
+ MembershipListener membershipListener = new MembershipListener() {
+ public void memberAdded(org.apache.catalina.tribes.Member member) {
+ membershipManager.memberAdded(member);
+ }
+
+ public void memberDisappeared(org.apache.catalina.tribes.Member member) {
+ membershipManager.memberDisappeared(member);
+ }
+ };
+ channel.addMembershipListener(membershipListener);
+ membershipManagers.add(membershipManager);
+ }
+ }
+
+ public List<MembershipManager> getMembershipManagers() {
+ return membershipManagers;
+ }
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Wed Jun 25 09:42:19 2008
@@ -19,17 +19,25 @@
package org.apache.axis2.clustering.tribes;
+import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.LoadBalanceEventHandler;
import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
/**
@@ -40,6 +48,18 @@
private static final Log log = LogFactory.getLog(MembershipManager.class);
private RpcChannel rpcChannel;
+ private StaticMembershipInterceptor staticMembershipInterceptor;
+
+ /**
+ * The domain corresponding to the membership handled by this MembershipManager
+ */
+ private byte[] domain;
+ private LoadBalanceEventHandler loadBalanceEventHandler;
+ private ConfigurationContext configContext;
+
+ public MembershipManager(ConfigurationContext configContext) {
+ this.configContext = configContext;
+ }
public MembershipManager() {
}
@@ -48,6 +68,23 @@
this.rpcChannel = rpcChannel;
}
+ public void setStaticMembershipInterceptor(
+ StaticMembershipInterceptor staticMembershipInterceptor) {
+ this.staticMembershipInterceptor = staticMembershipInterceptor;
+ }
+
+ public void setLoadBalanceEventHandler(LoadBalanceEventHandler loadBalanceEventHandler) {
+ this.loadBalanceEventHandler = loadBalanceEventHandler;
+ }
+
+ public void setDomain(byte[] domain) {
+ this.domain = domain;
+ }
+
+ public byte[] getDomain() {
+ return domain;
+ }
+
/**
* List of current members in the cluster. Only the members who are alive will be in this
* list
@@ -84,19 +121,47 @@
* A new member is added
*
* @param member The new member that joined the cluster
- * @return true - if the member was added to the <code>members</code> array; false, otherwise.
+ * @return true If the member was added to the <code>members</code> array; false, otherwise.
*/
public synchronized boolean memberAdded(Member member) {
- if (!members.contains(member)) {
- if (rpcChannel != null && wkaMembers.contains(member)) { // if it is a well-known member
+
+ // If this member already exists or if the member belongs to another domain,
+ // there is no need to add it
+ if(members.contains(member) || !Arrays.equals(domain, member.getDomain())){
+ return false;
+ }
+
+ if (staticMembershipInterceptor != null) { // this interceptor is null when multicast based scheme is used
+ staticMembershipInterceptor.addStaticMember(member);
+ if (log.isDebugEnabled()) {
+ log.debug("Added static member " + TribesUtil.getName(member) +
+ " from domain " + new String(member.getDomain()));
+ }
+ }
+
+ boolean shouldAddMember = localMember == null ||
+ Arrays.equals(localMember.getDomain(), member.getDomain());
+
+ // If this member is a load balancer, notify the respective load balance event handler?
+ if (loadBalanceEventHandler != null) {
+ log.info("Application member " + TribesUtil.getName(member) + " joined group " +
+ new String(member.getDomain()));
+ loadBalanceEventHandler.applicationMemberAdded(toAxis2Member(member));
+ }
+
+ if (shouldAddMember) {
+ if (rpcChannel != null && isLocalMemberInitialized() &&
+ wkaMembers.contains(member)) { // if it is a well-known member
log.info("A WKA member " + TribesUtil.getName(member) +
" just joined the group. Sending MEMBER_LIST message.");
- // send the memeber list to it
+ // send the member list to it
MemberListCommand memListCmd;
try {
memListCmd = new MemberListCommand();
- memListCmd.setMembers(getMembers());
+ List<Member> members = new ArrayList<Member>(this.members);
+ members.add(localMember); // Need to set the local member too
+ memListCmd.setMembers(members.toArray(new Member[members.size()]));
rpcChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
} catch (Exception e) {
@@ -107,11 +172,55 @@
}
}
members.add(member);
+ if (log.isDebugEnabled()) {
+ log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
+ new String(member.getDomain()));
+ }
return true;
}
return false;
}
+ private org.apache.axis2.clustering.Member toAxis2Member(Member member) {
+ org.apache.axis2.clustering.Member axis2Member =
+ new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
+ member.getPort());
+ Properties props = getProperties(member.getPayload());
+
+ String http = props.getProperty("HTTP");
+ if (http != null && http.trim().length() != 0) {
+ axis2Member.setHttpPort(Integer.parseInt(http));
+ }
+
+ String https = props.getProperty("HTTPS");
+ if (https != null && https.trim().length() != 0) {
+ axis2Member.setHttpsPort(Integer.parseInt(https));
+ }
+
+ return axis2Member;
+ }
+
+ private Properties getProperties(byte[] payload) {
+ Properties props = null;
+ try {
+ ByteArrayInputStream bin = new ByteArrayInputStream(payload);
+ props = new Properties();
+ props.load(bin);
+ } catch (IOException ignored) {
+ // This error will never occur
+ }
+ return props;
+ }
+
+ private boolean isLocalMemberInitialized() {
+ if (configContext == null) {
+ return false;
+ }
+ Object clusterInitialized =
+ configContext.getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED);
+ return clusterInitialized != null && clusterInitialized.equals("true");
+ }
+
/**
* A member disappeared
*
@@ -119,6 +228,11 @@
*/
public synchronized void memberDisappeared(Member member) {
members.remove(member);
+
+ // Is this an application domain member?
+ if (loadBalanceEventHandler != null) {
+ loadBalanceEventHandler.applicationMemberRemoved(toAxis2Member(member));
+ }
}
/**
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.Channel;
+
+import java.util.List;
+
+/**
+ * The mode in which this member is running such a loadBalance or application
+ */
+public interface Mode {
+
+ /**
+ * Add channel interecptors
+ *
+ * @param channel The Channel to which interceptors need to be added
+ */
+ public void addInterceptors(Channel channel);
+
+ /**
+ * Initialize this mode
+ *
+ * @param channel The channel related to this member
+ */
+ void init(Channel channel);
+
+ List<MembershipManager> getMembershipManagers();
+}
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.MembershipScheme;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.util.Utils;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.SocketException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of the multicast based membership scheme. In this scheme, membership is discovered
+ * using multicasts
+ */
+public class MulticastBasedMembershipScheme implements MembershipScheme {
+
+ private static final Log log = LogFactory.getLog(MulticastBasedMembershipScheme.class);
+
+ /**
+ * The Tribes channel
+ */
+ private ManagedChannel channel;
+ private Map<String, Parameter> parameters;
+
+ /**
+ * The domain to which this node belongs to
+ */
+ private byte[] domain;
+
+ /**
+ * The mode in which this member operates such as "loadBalance" or "application"
+ */
+ private Mode mode;
+
+ public MulticastBasedMembershipScheme(ManagedChannel channel,
+ Mode mode,
+ Map<String, Parameter> parameters,
+ byte[] domain) {
+ this.channel = channel;
+ this.mode = mode;
+ this.parameters = parameters;
+ this.domain = domain;
+ }
+
+ public void init() throws ClusteringFault {
+ addInterceptors();
+ configureMulticastParameters();
+ }
+
+ public void joinGroup() throws ClusteringFault {
+ // Nothing to do
+ }
+
+ private void configureMulticastParameters() throws ClusteringFault {
+ Properties mcastProps = channel.getMembershipService().getProperties();
+ Parameter mcastAddress = getParameter(TribesConstants.MCAST_ADDRESS);
+ if (mcastAddress != null) {
+ mcastProps.setProperty(TribesConstants.MCAST_ADDRESS,
+ ((String) mcastAddress.getValue()).trim());
+ }
+ Parameter mcastBindAddress = getParameter(TribesConstants.MCAST_BIND_ADDRESS);
+ if (mcastBindAddress != null) {
+ mcastProps.setProperty(TribesConstants.MCAST_BIND_ADDRESS,
+ ((String) mcastBindAddress.getValue()).trim());
+ }
+
+ Parameter mcastPort = getParameter(TribesConstants.MCAST_PORT);
+ if (mcastPort != null) {
+ mcastProps.setProperty(TribesConstants.MCAST_PORT,
+ ((String) mcastPort.getValue()).trim());
+ }
+ Parameter mcastFrequency = getParameter(TribesConstants.MCAST_FREQUENCY);
+ if (mcastFrequency != null) {
+ mcastProps.setProperty(TribesConstants.MCAST_FREQUENCY,
+ ((String) mcastFrequency.getValue()).trim());
+ }
+ Parameter mcastMemberDropTime = getParameter(TribesConstants.MEMBER_DROP_TIME);
+ if (mcastMemberDropTime != null) {
+ mcastProps.setProperty(TribesConstants.MEMBER_DROP_TIME,
+ ((String) mcastMemberDropTime.getValue()).trim());
+ }
+
+ // Set the IP address that will be advertised by this node
+ ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
+ Parameter tcpListenHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
+ if (tcpListenHost != null) {
+ String host = ((String) tcpListenHost.getValue()).trim();
+ mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
+ mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
+ receiver.setAddress(host);
+ } else {
+ String host;
+ try {
+ host = Utils.getIpAddress();
+ } catch (SocketException e) {
+ String msg = "Could not get local IP address";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
+ }
+ mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
+ mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
+ receiver.setAddress(host);
+ }
+ String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
+ if (localIP != null) {
+ receiver.setAddress(localIP);
+ }
+
+ Parameter tcpListenPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
+ if (tcpListenPort != null) {
+ String port = ((String) tcpListenPort.getValue()).trim();
+ mcastProps.setProperty(TribesConstants.TCP_LISTEN_PORT, port);
+ receiver.setPort(Integer.parseInt(port));
+ }
+
+ mcastProps.setProperty(TribesConstants.MCAST_CLUSTER_DOMAIN, new String(domain));
+ }
+
+ /**
+ * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
+ * membership management scheme
+ */
+ private void addInterceptors() {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Adding Interceptors...");
+ }
+
+ // Add a reliable failure detector
+ TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+ tcpFailureDetector.setConnectTimeout(30000);
+ channel.addInterceptor(tcpFailureDetector);
+ if (log.isDebugEnabled()) {
+ log.debug("Added TCP Failure Detector");
+ }
+
+ channel.getMembershipService().setDomain(domain);
+ mode.addInterceptors(channel);
+
+ // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+ atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
+ channel.addInterceptor(atMostOnceInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added At-most-once Interceptor");
+ }
+
+ // Add the OrderInterceptor to preserve sender ordering
+ OrderInterceptor orderInterceptor = new OrderInterceptor();
+ orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
+ channel.addInterceptor(orderInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Message Order Interceptor");
+ }
+ }
+
+ public Parameter getParameter(String name) {
+ return parameters.get(name);
+ }
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Wed Jun 25 09:42:19 2008
@@ -32,7 +32,6 @@
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcCallback;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,14 +45,11 @@
private static Log log = LogFactory.getLog(RpcRequestHandler.class);
private ConfigurationContext configurationContext;
private MembershipManager membershipManager;
- private StaticMembershipInterceptor staticMembershipInterceptor;
public RpcRequestHandler(ConfigurationContext configurationContext,
- MembershipManager membershipManager,
- StaticMembershipInterceptor staticMembershipInterceptor) {
+ MembershipManager membershipManager) {
this.configurationContext = configurationContext;
this.membershipManager = membershipManager;
- this.staticMembershipInterceptor = staticMembershipInterceptor;
}
public void setConfigurationContext(ConfigurationContext configurationContext) {
@@ -104,27 +100,17 @@
}
} else if (msg instanceof JoinGroupCommand) {
log.info("Received JOIN message from " + TribesUtil.getName(invoker));
- MemberListCommand memListCmd;
- try {
- // Add the member
- staticMembershipInterceptor.memberAdded(invoker);
- membershipManager.memberAdded(invoker);
-
- // Return the list of current members to the caller
- memListCmd = new MemberListCommand();
- memListCmd.setMembers(membershipManager.getMembers());
- } catch (Exception e) {
- String errMsg = "Cannot handle JOIN request";
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
- }
+ membershipManager.memberAdded(invoker);
+
+ // Return the list of current members to the caller
+ MemberListCommand memListCmd = new MemberListCommand();
+ memListCmd.setMembers(membershipManager.getMembers());
return memListCmd;
} else if (msg instanceof MemberJoinedCommand) {
log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(invoker));
try {
MemberJoinedCommand command = (MemberJoinedCommand) msg;
command.setMembershipManager(membershipManager);
- command.setStaticMembershipInterceptor(staticMembershipInterceptor);
command.execute(configurationContext);
} catch (ClusteringFault e) {
String errMsg = "Cannot handle MEMBER_JOINED notification";
@@ -135,8 +121,6 @@
try { //TODO: What if we receive more than one member list message?
MemberListCommand command = (MemberListCommand) msg;
command.setMembershipManager(membershipManager);
- command.setStaticMembershipInterceptor(staticMembershipInterceptor);
- command.setSender(invoker);
command.execute(configurationContext);
//TODO Send MEMBER_JOINED messages to all nodes
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed Jun 25 09:42:19 2008
@@ -21,11 +21,11 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.util.Utils;
import org.apache.axis2.clustering.ClusterManager;
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.LoadBalanceEventHandler;
+import org.apache.axis2.clustering.MembershipScheme;
import org.apache.axis2.clustering.RequestBlockingHandler;
import org.apache.axis2.clustering.configuration.ConfigurationManager;
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
@@ -35,9 +35,6 @@
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
-import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
-import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
-import org.apache.axis2.clustering.control.wka.MemberListCommand;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.HandlerDescription;
import org.apache.axis2.description.Parameter;
@@ -53,28 +50,14 @@
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcChannel;
-import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
-import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
-import org.apache.catalina.tribes.membership.StaticMember;
import org.apache.catalina.tribes.transport.MultiPointSender;
-import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.transport.ReplicationTransmitter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -98,7 +81,16 @@
private ChannelSender channelSender;
private MembershipManager membershipManager;
private RpcRequestHandler rpcRequestHandler;
- private StaticMembershipInterceptor staticMembershipInterceptor;
+ private MembershipScheme membershipScheme;
+
+ /**
+ * The mode in which this member operates such as "loadBalance" or "application"
+ */
+ private Mode mode;
+
+ /**
+ * Static members
+ */
private List<org.apache.axis2.clustering.Member> members;
private final Map<String, LoadBalanceEventHandler> lbEventHandlers =
@@ -145,30 +137,30 @@
public void init() throws ClusteringFault {
log.info("Initializing cluster...");
addRequestBlockingHandlerToInFlows();
- membershipManager = new MembershipManager();
+ membershipManager = new MembershipManager(configurationContext);
+
channel = new GroupChannel();
channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers());
channelListener =
new ChannelListener(configurationContext, configurationManager, contextManager);
-
- setMaximumRetries();
-
- String membershipScheme = getMembershipScheme();
- log.info("Using " + membershipScheme + " based membership management scheme");
+ channel.addChannelListener(channelListener);
byte[] domain = getClusterDomain();
log.info("Cluster domain: " + new String(domain));
+ membershipManager.setDomain(domain);
- // Add all the ChannelInterceptors
- addInterceptors(channel, domain, membershipScheme);
-
- // Membership scheme handling
- // If it is a WKA scheme, connect to a WKA and get a list of members. Add the members
- // to the MembershipManager
- configureMembershipScheme(domain, membershipScheme);
-
- channel.addChannelListener(channelListener);
+ // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
+ // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+ rpcRequestHandler = new RpcRequestHandler(configurationContext, membershipManager);
+ rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+ if (log.isDebugEnabled()) {
+ log.debug("Created RPC Channel for domain " + new String(domain));
+ }
+ membershipManager.setRpcChannel(rpcChannel);
+ setMaximumRetries();
+ configureMode(domain);
+ configureMembershipScheme(domain, mode.getMembershipManagers());
setMemberTransportInfo();
TribesMembershipListener membershipListener = new TribesMembershipListener(membershipManager);
@@ -188,72 +180,10 @@
throw new ClusteringFault(msg, e);
}
- // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
- // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
- rpcRequestHandler = new RpcRequestHandler(configurationContext,
- membershipManager,
- staticMembershipInterceptor);
- rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
- membershipManager.setRpcChannel(rpcChannel);
-
-
log.info("Local Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers(membershipManager);
- // If a WKA scheme is used, JOIN the group and get the member list
- if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)
- && membershipManager.getMembers().length > 0) {
- log.info("Sending JOIN message to WKA members...");
- Member[] wkaMembers = membershipManager.getMembers(); // The well-known members
- try {
- Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
- } catch (InterruptedException ignored) {
- }
- Response[] responses = null;
- do {
- try {
- responses = rpcChannel.send(wkaMembers,
- new JoinGroupCommand(),
- RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
- 10000);
- if (responses.length == 0) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException ignored) {
- }
- }
- } catch (Exception e) {
- String msg = "Error occurred while trying to send JOIN request to WKA members";
- log.error(msg, e);
- }
-
- // TODO: If we do not get a response within some time, try to recover from this fault
- }
- while (responses == null || responses.length == 0); // Wait until we've received at least one response
-
- for (Response response : responses) {
- MemberListCommand command = (MemberListCommand) response.getMessage();
- command.setMembershipManager(membershipManager);
- command.setStaticMembershipInterceptor(staticMembershipInterceptor);
- command.setSender(response.getSource());
- command.execute(configurationContext);
- }
-
- if (membershipManager.getMembers().length > 0) {
- log.info("Sending MEMBER_JOINED to group...");
- MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
- memberJoinedCommand.setMember(membershipManager.getLocalMember());
- try {
- rpcChannel.send(membershipManager.getMembers(), memberJoinedCommand,
- RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
- } catch (ChannelException e) {
- String msg = "Could not send MEMBER_JOINED message to group";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
- }
- }
- }
+ membershipScheme.joinGroup();
// If configuration management is enabled, get the latest config from a neighbour
if (configurationManager != null) {
@@ -409,362 +339,42 @@
}
}
+ private void configureMode(byte[] domain) {
+ if (loadBalanceMode) {
+ mode = new LoadBalancerMode(domain, lbEventHandlers);
+ } else {
+ mode = new ApplicationMode(domain);
+ }
+ mode.init(channel);
+ }
+
/**
* Handle specific configurations related to different membership management schemes.
*
- * @param domain The clustering loadBalancerDomain to which this member belongs to
- * @param membershipScheme The membership scheme. Only wka & multicast are valid values.
+ * @param domain The clustering loadBalancerDomain to which this member belongs to
+ * @param membershipManagers MembershipManagers for different domains
* @throws ClusteringFault If the membership scheme is invalid, or if an error occurs
* while configuring membership scheme
*/
- private void configureMembershipScheme(byte[] domain, String membershipScheme)
+ private void configureMembershipScheme(byte[] domain,
+ List<MembershipManager> membershipManagers)
throws ClusteringFault {
-
- if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
- configureWkaBasedMembership(domain);
- } else if (membershipScheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
- configureMulticastBasedMembership(channel, domain);
+ String scheme = getMembershipScheme();
+ log.info("Using " + scheme + " based membership management scheme");
+ if (scheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
+ membershipScheme = new WkaBasedMembershipScheme(channel, mode,
+ membershipManagers,
+ rpcChannel, membershipManager,
+ parameters, domain, members);
+ } else if (scheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
+ membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters, domain);
} else {
- String msg = "Invalid membership scheme '" + membershipScheme +
+ String msg = "Invalid membership scheme '" + scheme +
"'. Supported schemes are multicast & wka";
log.error(msg);
throw new ClusteringFault(msg);
}
- }
-
- /**
- * Configure the membership related to the WKA based scheme
- *
- * @param domain The loadBalancerDomain to which the members belong to
- * @throws ClusteringFault If an error occurs while configuring this scheme
- */
- private void configureWkaBasedMembership(byte[] domain) throws ClusteringFault {
- channel.setMembershipService(new WkaMembershipService(membershipManager));
- StaticMember localMember = new StaticMember();
- membershipManager.setLocalMember(localMember);
- ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
-
- // ------------ START: Configure and add the local member ---------------------
- Parameter localHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
- String host;
- if (localHost != null) {
- host = ((String) localHost.getValue()).trim();
- } else { // In cases where the localhost needs to be automatically figured out
- try {
- try {
- host = Utils.getIpAddress();
- } catch (SocketException e) {
- String msg = "Could not get local IP address";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
- }
- } catch (Exception e) {
- String msg = "Could not get the localhost name";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
- }
- }
- receiver.setAddress(host);
- try {
- localMember.setHostname(host);
- } catch (IOException e) {
- String msg = "Could not set the local member's name";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
- }
-
- Parameter localPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
- int port;
- try {
- if (localPort != null) {
- port = Integer.parseInt(((String) localPort.getValue()).trim());
- port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
- } else { // In cases where the localport needs to be automatically figured out
- port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
- }
- } catch (IOException e) {
- String msg =
- "Could not allocate the specified port or a port in the range 4000-4100 " +
- "for local host " + localMember.getHostname() +
- ". Check whether the IP address specified or inferred for the local " +
- "member is correct.";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
- }
-
- byte[] payload = "ping".getBytes();
- localMember.setPayload(payload);
- receiver.setPort(port);
- localMember.setPort(port);
- localMember.setDomain(domain);
- staticMembershipInterceptor.setLocalMember(localMember);
-
- // ------------ END: Configure and add the local member ---------------------
-
- // ------------ START: Add other members ---------------------
- for (org.apache.axis2.clustering.Member member : members) {
- StaticMember tribesMember;
- try {
- tribesMember = new StaticMember(member.getHostName(), member.getPort(),
- 0, payload);
- } catch (IOException e) {
- String msg = "Could not add static member " +
- member.getHostName() + ":" + member.getPort();
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
- }
-
- // Do not add the local member to the list of members
- if (!(Arrays.equals(localMember.getHost(), tribesMember.getHost()) &&
- localMember.getPort() == tribesMember.getPort())) {
- tribesMember.setDomain(domain);
-
- // We will add the member even if it is offline at this moment. When the
- // member comes online, it will be detected by the GMS
- staticMembershipInterceptor.addStaticMember(tribesMember);
- membershipManager.addWellKnownMember(tribesMember);
- if (canConnect(member)) {
- membershipManager.memberAdded(tribesMember);
- log.info("Added static member " + TribesUtil.getName(tribesMember));
- } else {
- log.info("Could not connect to member " + TribesUtil.getName(tribesMember));
- }
- }
- }
- }
-
- /**
- * Before adding a static member, we will try to verify whether we can connect to it
- *
- * @param member The member whose connectvity needs to be verified
- * @return true, if the member can be contacted; false, otherwise.
- */
- private boolean canConnect(org.apache.axis2.clustering.Member member) {
- for (int retries = 5; retries > 0; retries--) {
- try {
- InetAddress addr = InetAddress.getByName(member.getHostName());
- SocketAddress sockaddr = new InetSocketAddress(addr,
- member.getPort());
- new Socket().connect(sockaddr, 500);
- return true;
- } catch (IOException e) {
- String msg = e.getMessage();
- if (msg.indexOf("Connection refused") == -1 && msg.indexOf("connect timed out") == -1) {
- log.error("Cannot connect to member " +
- member.getHostName() + ":" + member.getPort(), e);
- }
- }
- }
- return false;
- }
-
- protected int getLocalPort(ServerSocket socket, String hostname,
- int preferredPort, int portstart, int retries) throws IOException {
- if (preferredPort != -1) {
- try {
- return getLocalPort(socket, hostname, preferredPort);
- } catch (IOException ignored) {
- // Fall through and try a default port
- }
- }
- InetSocketAddress addr = null;
- if (retries > 0) {
- try {
- return getLocalPort(socket, hostname, portstart);
- } catch (IOException x) {
- retries--;
- if (retries <= 0) {
- log.error("Unable to bind server socket to:" + addr + " throwing error.");
- throw x;
- }
- portstart++;
- try {
- Thread.sleep(50);
- } catch (InterruptedException ignored) {
- ignored.printStackTrace();
- }
- getLocalPort(socket, hostname, portstart, retries, -1);
- }
- }
- return portstart;
- }
-
- private int getLocalPort(ServerSocket socket, String hostname, int port) throws IOException {
- InetSocketAddress addr;
- addr = new InetSocketAddress(hostname, port);
- socket.bind(addr);
- log.info("Receiver Server Socket bound to:" + addr);
- socket.setSoTimeout(5);
- socket.close();
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- ignored.printStackTrace();
- }
- return port;
- }
-
- /**
- * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
- * membership management scheme
- *
- * @param channel The Tribes channel
- * @param domain The loadBalancerDomain to which this node belongs to
- * @param membershipScheme The membership scheme. Only wka & multicast are valid values.
- * @throws ClusteringFault If an error occurs while adding interceptors
- */
- private void addInterceptors(ManagedChannel channel,
- byte[] domain,
- String membershipScheme) throws ClusteringFault {
-
- if (log.isDebugEnabled()) {
- log.debug("Adding Interceptors...");
- }
- if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
- TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
- tcpPingInterceptor.setInterval(100);
- channel.addInterceptor(tcpPingInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added TCP Ping Interceptor");
- }
- }
-
- // Add the NonBlockingCoordinator. This is used for leader election
- /*nbc = new NonBlockingCoordinator() {
- public void fireInterceptorEvent(InterceptorEvent event) {
- String status = event.getEventTypeDesc();
- System.err.println("$$$$$$$$$$$$ NBC status=" + status);
- int type = event.getEventType();
- }
- };
- nbc.setPrevious(dfi);
- channel.addInterceptor(nbc);*/
-
- // Add a reliable failure detector
- TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-// tcpFailureDetector.setPrevious(dfi); //TODO: check this
-// tcpFailureDetector.setReadTestTimeout(30000);
- tcpFailureDetector.setConnectTimeout(30000);
- channel.addInterceptor(tcpFailureDetector);
- if (log.isDebugEnabled()) {
- log.debug("Added TCP Failure Detector");
- }
-
- // Add a DomainFilterInterceptor
- channel.getMembershipService().setDomain(domain);
- if (!loadBalanceMode) {
- DomainFilterInterceptor dfi = new DomainFilterInterceptor();
- dfi.setDomain(domain);
- channel.addInterceptor(dfi);
- if (log.isDebugEnabled()) {
- log.debug("Added Domain Filter Interceptor");
- }
- } else {
- LoadBalancerInterceptor lbInterceptor =
- new LoadBalancerInterceptor(domain, lbEventHandlers);
- channel.addInterceptor(lbInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added Load Balancer Interceptor");
- }
- }
-
- // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
- AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
- atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
- channel.addInterceptor(atMostOnceInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added At-most-once Interceptor");
- }
-
- // Add the OrderInterceptor to preserve sender ordering
- OrderInterceptor orderInterceptor = new OrderInterceptor();
- orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
- channel.addInterceptor(orderInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added Message Order Interceptor");
- }
-
- if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
- staticMembershipInterceptor = new StaticMembershipInterceptor();
- channel.addInterceptor(staticMembershipInterceptor);
- if (log.isDebugEnabled()) {
- log.debug("Added Static Membership Interceptor");
- }
- }
- }
-
- /**
- * If a multicast based membership management scheme is used, configure the multicasting related
- * parameters
- *
- * @param channel The Tribes channel
- * @param domain The clustering loadBalancerDomain to which this node belongs to
- * @throws ClusteringFault If an error occurs while obtaining the local host address
- */
- private void configureMulticastBasedMembership(ManagedChannel channel,
- byte[] domain) throws ClusteringFault {
- Properties mcastProps = channel.getMembershipService().getProperties();
- Parameter mcastAddress = getParameter(TribesConstants.MCAST_ADDRESS);
- if (mcastAddress != null) {
- mcastProps.setProperty(TribesConstants.MCAST_ADDRESS,
- ((String) mcastAddress.getValue()).trim());
- }
- Parameter mcastBindAddress = getParameter(TribesConstants.MCAST_BIND_ADDRESS);
- if (mcastBindAddress != null) {
- mcastProps.setProperty(TribesConstants.MCAST_BIND_ADDRESS,
- ((String) mcastBindAddress.getValue()).trim());
- }
-
- Parameter mcastPort = getParameter(TribesConstants.MCAST_PORT);
- if (mcastPort != null) {
- mcastProps.setProperty(TribesConstants.MCAST_PORT,
- ((String) mcastPort.getValue()).trim());
- }
- Parameter mcastFrequency = getParameter(TribesConstants.MCAST_FREQUENCY);
- if (mcastFrequency != null) {
- mcastProps.setProperty(TribesConstants.MCAST_FREQUENCY,
- ((String) mcastFrequency.getValue()).trim());
- }
- Parameter mcastMemberDropTime = getParameter(TribesConstants.MEMBER_DROP_TIME);
- if (mcastMemberDropTime != null) {
- mcastProps.setProperty(TribesConstants.MEMBER_DROP_TIME,
- ((String) mcastMemberDropTime.getValue()).trim());
- }
-
- // Set the IP address that will be advertised by this node
- ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
- Parameter tcpListenHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
- if (tcpListenHost != null) {
- String host = ((String) tcpListenHost.getValue()).trim();
- mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
- mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
- receiver.setAddress(host);
- } else {
- String host;
- try {
- host = Utils.getIpAddress();
- } catch (SocketException e) {
- String msg = "Could not get local IP address";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
- }
- mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
- mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
- receiver.setAddress(host);
- }
- String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
- if (localIP != null) {
- receiver.setAddress(localIP);
- }
-
- Parameter tcpListenPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
- if (tcpListenPort != null) {
- String port = ((String) tcpListenPort.getValue()).trim();
- mcastProps.setProperty(TribesConstants.TCP_LISTEN_PORT, port);
- receiver.setPort(Integer.parseInt(port));
- }
-
- mcastProps.setProperty(TribesConstants.MCAST_CLUSTER_DOMAIN, new String(domain));
+ membershipScheme.init();
}
/**
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Wed Jun 25 09:42:19 2008
@@ -24,8 +24,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-/** In
- *
+/**
+ * Membership changes are notified using this class
*/
public class TribesMembershipListener implements MembershipListener {
@@ -37,7 +37,6 @@
}
public void memberAdded(Member member) {
-
if (membershipManager.memberAdded(member)) {
log.info("New member " + TribesUtil.getName(member) + " joined cluster.");
}