You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2012/01/25 09:59:32 UTC
svn commit: r1235695 - in
/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes:
MembershipManager.java WkaBasedMembershipScheme.java
Author: azeez
Date: Wed Jan 25 08:59:32 2012
New Revision: 1235695
URL: http://svn.apache.org/viewvc?rev=1235695&view=rev
Log:
Resend MEMBER_LIST message to WKA members who do not respond to the MEMBER_LIST message
Modified:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=1235695&r1=1235694&r2=1235695&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Wed Jan 25 08:59:32 2012
@@ -37,6 +37,10 @@ import org.apache.commons.logging.LogFac
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Responsible for managing the membership. Handles membership changes.
@@ -68,10 +72,22 @@ public class MembershipManager {
private final List<Member> wkaMembers = new ArrayList<Member>();
/**
+ * List of Well-Known members which have not responded to the MEMBER_LIST message.
+ * We need to retry sending the MEMBER_LIST message to these members until they respond,
+ * otherwise, we cannot be sure whether these WKA members added the members in the MEMBER_LIST
+ */
+ private final List<Member> nonRespondingWkaMembers = new CopyOnWriteArrayList<Member>();
+
+ /**
* The member representing this node
*/
private Member localMember;
+ /**
+ *
+ */
+ private boolean isMemberListResponseReceived;
+
public MembershipManager(ConfigurationContext configContext) {
this.configContext = configContext;
}
@@ -87,9 +103,10 @@ public class MembershipManager {
return rpcMembershipChannel;
}
- public void setStaticMembershipInterceptor(
- StaticMembershipInterceptor staticMembershipInterceptor) {
+ public void setupStaticMembershipManagement(StaticMembershipInterceptor staticMembershipInterceptor) {
this.staticMembershipInterceptor = staticMembershipInterceptor;
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ scheduler.scheduleWithFixedDelay(new MemberListSenderTask(), 5, 5, TimeUnit.SECONDS);
}
public void setGroupManagementAgent(GroupManagementAgent groupManagementAgent) {
@@ -157,54 +174,93 @@ public class MembershipManager {
}
if (shouldAddMember) {
+ boolean wkaMemberBelongsToLocalDomain = true;
if (rpcMembershipChannel != null && isLocalMemberInitialized() &&
wkaMembers.contains(member)) { // if it is a well-known member
log.info("A WKA member " + TribesUtil.getName(member) +
" just joined the group. Sending MEMBER_LIST message.");
- // send the member list to it
- MemberListCommand memListCmd;
- try {
- memListCmd = new MemberListCommand();
- List<Member> members = new ArrayList<Member>(this.members);
- members.add(localMember); // Need to set the local member too
- memListCmd.setMembers(members.toArray(new Member[members.size()]));
-
- Response[] responses =
- rpcMembershipChannel.send(new Member[]{member}, memListCmd,
- RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS |
- TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
-
- // Once a response is received from the WKA member to the MEMBER_LIST message,
- // if it does not belong to this domain, simply remove it from the members
- if (responses != null && responses.length > 0 && responses[0] != null) {
- Member source = responses[0].getSource();
- if (!TribesUtil.areInSameDomain(source, member)) {
- if (log.isDebugEnabled()) {
- log.debug("WKA Member " + TribesUtil.getName(source) +
- " does not belong to local domain " + new String(domain) +
- ". Hence removing it from the list.");
- }
- members.remove(member);
- return false;
+ wkaMemberBelongsToLocalDomain = sendMemberListToWellKnownMember(member);
+ }
+ if (wkaMemberBelongsToLocalDomain) {
+ members.add(member);
+ if (log.isDebugEnabled()) {
+ log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
+ new String(member.getDomain()));
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Task which send MEMBER_LIST messages to WKA members which have not yet responded to the
+ * MEMBER_LIST message
+ */
+ private class MemberListSenderTask implements Runnable {
+ public void run() {
+ try {
+ if (nonRespondingWkaMembers != null && !nonRespondingWkaMembers.isEmpty()) {
+ for (Member wkaMember : nonRespondingWkaMembers) {
+ if (wkaMember != null) {
+ sendMemberListToWellKnownMember(wkaMember);
}
}
- } catch (Exception e) {
- String errMsg = "Could not send MEMBER_LIST to well-known member " +
- TribesUtil.getName(member);
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
}
+ } catch (Throwable e) {
+ log.error("Could not send MemberList to WKA Members", e);
}
- members.add(member);
- if (log.isDebugEnabled()) {
- log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
- new String(member.getDomain()));
+ }
+ }
+
+ /**
+ * Send MEMBER_LIST message to WKA member
+ *
+ * @param wkaMember The WKA member to whom the MEMBER_LIST has to be sent
+ * @return true - if the WKA member belongs to the domain of this local member
+ */
+ private boolean sendMemberListToWellKnownMember(Member wkaMember) {
+ /*if (wkaMember.isFailing() || wkaMember.isSuspect()) {
+ return false;
+ }*/
+ // send the member list to it
+ MemberListCommand memListCmd;
+ try {
+ memListCmd = new MemberListCommand();
+ List<Member> members = new ArrayList<Member>(this.members);
+ members.add(localMember); // Need to set the local member too
+ memListCmd.setMembers(members.toArray(new Member[members.size()]));
+
+ Response[] responses =
+ rpcMembershipChannel.send(new Member[]{wkaMember}, memListCmd,
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
+
+ // Once a response is received from the WKA member to the MEMBER_LIST message,
+ // if it does not belong to this domain, simply remove it from the members
+ if (responses != null && responses.length > 0 && responses[0] != null) {
+ nonRespondingWkaMembers.remove(wkaMember);
+ Member source = responses[0].getSource();
+ if (!TribesUtil.areInSameDomain(source, wkaMember)) {
+ if (log.isDebugEnabled()) {
+ log.debug("WKA Member " + TribesUtil.getName(source) +
+ " does not belong to local domain " + new String(domain) +
+ ". Hence removing it from the list.");
+ }
+ return false;
+ }
+ } else { // No response from WKA member
+ nonRespondingWkaMembers.add(wkaMember);
}
- return true;
+ } catch (Exception e) {
+ String errMsg = "Could not send MEMBER_LIST to well-known member " +
+ TribesUtil.getName(wkaMember);
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
}
- return false;
+ return true;
}
/**
@@ -278,6 +334,7 @@ public class MembershipManager {
*/
public void memberDisappeared(Member member) {
members.remove(member);
+ nonRespondingWkaMembers.remove(member);
// Is this an application domain member?
if (groupManagementAgent != null) {
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=1235695&r1=1235694&r2=1235695&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Wed Jan 25 08:59:32 2012
@@ -1,17 +1,17 @@
-/*
- * Copyright 2004,2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.axis2.clustering.tribes;
@@ -289,7 +289,7 @@ public class WkaBasedMembershipScheme im
log.debug("Adding Interceptors...");
}
TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
- tcpPingInterceptor.setInterval(100);
+ tcpPingInterceptor.setInterval(10000);
channel.addInterceptor(tcpPingInterceptor);
if (log.isDebugEnabled()) {
log.debug("Added TCP Ping Interceptor");
@@ -299,7 +299,7 @@ public class WkaBasedMembershipScheme im
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
// tcpFailureDetector.setPrevious(dfi); //TODO: check this
tcpFailureDetector.setReadTestTimeout(120000);
- tcpFailureDetector.setConnectTimeout(60000);
+ tcpFailureDetector.setConnectTimeout(180000);
channel.addInterceptor(tcpFailureDetector);
if (log.isDebugEnabled()) {
log.debug("Added TCP Failure Detector");
@@ -310,7 +310,7 @@ public class WkaBasedMembershipScheme im
staticMembershipInterceptor = new StaticMembershipInterceptor();
staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
- primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ primaryMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
channel.addInterceptor(staticMembershipInterceptor);
if (log.isDebugEnabled()) {
log.debug("Added Static Membership Interceptor");
@@ -350,7 +350,7 @@ public class WkaBasedMembershipScheme im
// Have multiple RPC channels with multiple RPC request handlers for each localDomain
// This is needed only when this member is running as a load balancer
for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
- appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ appDomainMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
// Create an RpcChannel for each localDomain
String domain = new String(appDomainMembershipManager.getDomain());
@@ -377,7 +377,6 @@ public class WkaBasedMembershipScheme im
// Send JOIN message to a WKA member
if (primaryMembershipManager.getMembers().length > 0) {
- log.info("Sending JOIN message to WKA members...");
org.apache.catalina.tribes.Member[] wkaMembers = primaryMembershipManager.getMembers(); // The well-known members
/*try {
Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
@@ -386,6 +385,7 @@ public class WkaBasedMembershipScheme im
Response[] responses = null;
do {
try {
+ log.info("Sending JOIN message to WKA members...");
responses = rpcMembershipChannel.send(wkaMembers,
new JoinGroupCommand(),
RpcChannel.ALL_REPLY,
@@ -394,10 +394,8 @@ public class WkaBasedMembershipScheme im
10000);
if (responses.length == 0) {
try {
- if (log.isDebugEnabled()) {
- log.debug("No responses received");
- }
- Thread.sleep(500);
+ log.info("No responses received from WKA members");
+ Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
}