You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2012/01/25 09:59:32 UTC

svn commit: r1235695 - in /axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes: MembershipManager.java WkaBasedMembershipScheme.java

Author: azeez
Date: Wed Jan 25 08:59:32 2012
New Revision: 1235695

URL: http://svn.apache.org/viewvc?rev=1235695&view=rev
Log:
Resend MEMBER_LIST message to WKA members who do not respond to the MEMBER_LIST message

Modified:
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java

Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=1235695&r1=1235694&r2=1235695&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Wed Jan 25 08:59:32 2012
@@ -37,6 +37,10 @@ import org.apache.commons.logging.LogFac
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Responsible for managing the membership. Handles membership changes.
@@ -68,10 +72,22 @@ public class MembershipManager {
     private final List<Member> wkaMembers = new ArrayList<Member>();
 
     /**
+     * List of Well-Known members which have not responded to the MEMBER_LIST message.
+     * We need to retry sending the MEMBER_LIST message to these members until they respond,
+     * otherwise, we cannot be sure whether these WKA members added the members in the MEMBER_LIST
+     */
+    private final List<Member> nonRespondingWkaMembers = new CopyOnWriteArrayList<Member>();
+
+    /**
      * The member representing this node
      */
     private Member localMember;
 
+    /**
+     *
+     */
+    private boolean isMemberListResponseReceived;
+
     public MembershipManager(ConfigurationContext configContext) {
         this.configContext = configContext;
     }
@@ -87,9 +103,10 @@ public class MembershipManager {
         return rpcMembershipChannel;
     }
 
-    public void setStaticMembershipInterceptor(
-            StaticMembershipInterceptor staticMembershipInterceptor) {
+    public void setupStaticMembershipManagement(StaticMembershipInterceptor staticMembershipInterceptor) {
         this.staticMembershipInterceptor = staticMembershipInterceptor;
+        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+        scheduler.scheduleWithFixedDelay(new MemberListSenderTask(), 5, 5, TimeUnit.SECONDS);
     }
 
     public void setGroupManagementAgent(GroupManagementAgent groupManagementAgent) {
@@ -157,54 +174,93 @@ public class MembershipManager {
         }
 
         if (shouldAddMember) {
+            boolean wkaMemberBelongsToLocalDomain = true;
             if (rpcMembershipChannel != null && isLocalMemberInitialized() &&
                 wkaMembers.contains(member)) { // if it is a well-known member
 
                 log.info("A WKA member " + TribesUtil.getName(member) +
                          " just joined the group. Sending MEMBER_LIST message.");
-                // send the member list to it
-                MemberListCommand memListCmd;
-                try {
-                    memListCmd = new MemberListCommand();
-                    List<Member> members = new ArrayList<Member>(this.members);
-                    members.add(localMember); // Need to set the local member too
-                    memListCmd.setMembers(members.toArray(new Member[members.size()]));
-
-                    Response[] responses =
-                            rpcMembershipChannel.send(new Member[]{member}, memListCmd,
-                                                      RpcChannel.ALL_REPLY,
-                                                      Channel.SEND_OPTIONS_ASYNCHRONOUS |
-                                                      TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
-
-                    // Once a response is received from the WKA member to the MEMBER_LIST message,
-                    // if it does not belong to this domain, simply remove it from the members
-                    if (responses != null && responses.length > 0 && responses[0] != null) {
-                        Member source = responses[0].getSource();
-                        if (!TribesUtil.areInSameDomain(source, member)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("WKA Member " + TribesUtil.getName(source) +
-                                          " does not belong to local domain " + new String(domain) +
-                                          ". Hence removing it from the list.");
-                            }
-                            members.remove(member);
-                            return false;
+                wkaMemberBelongsToLocalDomain = sendMemberListToWellKnownMember(member);
+            }
+            if (wkaMemberBelongsToLocalDomain) {
+                members.add(member);
+                if (log.isDebugEnabled()) {
+                    log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
+                              new String(member.getDomain()));
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Task which send MEMBER_LIST messages to WKA members which have not yet responded to the
+     * MEMBER_LIST message
+     */
+    private class MemberListSenderTask implements Runnable {
+        public void run() {
+            try {
+                if (nonRespondingWkaMembers != null && !nonRespondingWkaMembers.isEmpty()) {
+                    for (Member wkaMember : nonRespondingWkaMembers) {
+                        if (wkaMember != null) {
+                            sendMemberListToWellKnownMember(wkaMember);
                         }
                     }
-                } catch (Exception e) {
-                    String errMsg = "Could not send MEMBER_LIST to well-known member " +
-                                    TribesUtil.getName(member);
-                    log.error(errMsg, e);
-                    throw new RemoteProcessException(errMsg, e);
                 }
+            } catch (Throwable e) {
+                log.error("Could not send MemberList to WKA Members", e);
             }
-            members.add(member);
-            if (log.isDebugEnabled()) {
-                log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
-                          new String(member.getDomain()));
+        }
+    }
+
+    /**
+     * Send MEMBER_LIST message to WKA member
+     *
+     * @param wkaMember The WKA member to whom the MEMBER_LIST has to be sent
+     * @return true - if the WKA member belongs to the domain of this local member
+     */
+    private boolean sendMemberListToWellKnownMember(Member wkaMember) {
+        /*if (wkaMember.isFailing() || wkaMember.isSuspect()) {
+            return false;
+        }*/
+        // send the member list to it
+        MemberListCommand memListCmd;
+        try {
+            memListCmd = new MemberListCommand();
+            List<Member> members = new ArrayList<Member>(this.members);
+            members.add(localMember); // Need to set the local member too
+            memListCmd.setMembers(members.toArray(new Member[members.size()]));
+
+            Response[] responses =
+                    rpcMembershipChannel.send(new Member[]{wkaMember}, memListCmd,
+                                              RpcChannel.ALL_REPLY,
+                                              Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                              TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
+
+            // Once a response is received from the WKA member to the MEMBER_LIST message,
+            // if it does not belong to this domain, simply remove it from the members
+            if (responses != null && responses.length > 0 && responses[0] != null) {
+                nonRespondingWkaMembers.remove(wkaMember);
+                Member source = responses[0].getSource();
+                if (!TribesUtil.areInSameDomain(source, wkaMember)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("WKA Member " + TribesUtil.getName(source) +
+                                  " does not belong to local domain " + new String(domain) +
+                                  ". Hence removing it from the list.");
+                    }
+                    return false;
+                }
+            } else { // No response from WKA member
+                nonRespondingWkaMembers.add(wkaMember);
             }
-            return true;
+        } catch (Exception e) {
+            String errMsg = "Could not send MEMBER_LIST to well-known member " +
+                            TribesUtil.getName(wkaMember);
+            log.error(errMsg, e);
+            throw new RemoteProcessException(errMsg, e);
         }
-        return false;
+        return true;
     }
 
     /**
@@ -278,6 +334,7 @@ public class MembershipManager {
      */
     public void memberDisappeared(Member member) {
         members.remove(member);
+        nonRespondingWkaMembers.remove(member);
 
         // Is this an application domain member?
         if (groupManagementAgent != null) {

Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=1235695&r1=1235694&r2=1235695&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Wed Jan 25 08:59:32 2012
@@ -1,17 +1,17 @@
-/*                                                                             
- * Copyright 2004,2005 The Apache Software Foundation.                         
- *                                                                             
- * Licensed under the Apache License, Version 2.0 (the "License");             
- * you may not use this file except in compliance with the License.            
- * You may obtain a copy of the License at                                     
- *                                                                             
- *      http://www.apache.org/licenses/LICENSE-2.0                             
- *                                                                             
- * Unless required by applicable law or agreed to in writing, software         
- * distributed under the License is distributed on an "AS IS" BASIS,           
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
- * See the License for the specific language governing permissions and         
- * limitations under the License.                                              
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.axis2.clustering.tribes;
 
@@ -289,7 +289,7 @@ public class WkaBasedMembershipScheme im
             log.debug("Adding Interceptors...");
         }
         TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
-        tcpPingInterceptor.setInterval(100);
+        tcpPingInterceptor.setInterval(10000);
         channel.addInterceptor(tcpPingInterceptor);
         if (log.isDebugEnabled()) {
             log.debug("Added TCP Ping Interceptor");
@@ -299,7 +299,7 @@ public class WkaBasedMembershipScheme im
         TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
 //        tcpFailureDetector.setPrevious(dfi); //TODO: check this
         tcpFailureDetector.setReadTestTimeout(120000);
-        tcpFailureDetector.setConnectTimeout(60000);
+        tcpFailureDetector.setConnectTimeout(180000);
         channel.addInterceptor(tcpFailureDetector);
         if (log.isDebugEnabled()) {
             log.debug("Added TCP Failure Detector");
@@ -310,7 +310,7 @@ public class WkaBasedMembershipScheme im
 
         staticMembershipInterceptor = new StaticMembershipInterceptor();
         staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
-        primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+        primaryMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
         channel.addInterceptor(staticMembershipInterceptor);
         if (log.isDebugEnabled()) {
             log.debug("Added Static Membership Interceptor");
@@ -350,7 +350,7 @@ public class WkaBasedMembershipScheme im
         // Have multiple RPC channels with multiple RPC request handlers for each localDomain
         // This is needed only when this member is running as a load balancer
         for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
-            appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+            appDomainMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
 
             // Create an RpcChannel for each localDomain
             String domain = new String(appDomainMembershipManager.getDomain());
@@ -377,7 +377,6 @@ public class WkaBasedMembershipScheme im
 
         // Send JOIN message to a WKA member
         if (primaryMembershipManager.getMembers().length > 0) {
-            log.info("Sending JOIN message to WKA members...");
             org.apache.catalina.tribes.Member[] wkaMembers = primaryMembershipManager.getMembers(); // The well-known members
             /*try {
                 Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
@@ -386,6 +385,7 @@ public class WkaBasedMembershipScheme im
             Response[] responses = null;
             do {
                 try {
+                    log.info("Sending JOIN message to WKA members...");
                     responses = rpcMembershipChannel.send(wkaMembers,
                                                           new JoinGroupCommand(),
                                                           RpcChannel.ALL_REPLY,
@@ -394,10 +394,8 @@ public class WkaBasedMembershipScheme im
                                                           10000);
                     if (responses.length == 0) {
                         try {
-                            if (log.isDebugEnabled()) {
-                                log.debug("No responses received");
-                            }
-                            Thread.sleep(500);
+                            log.info("No responses received from WKA members");
+                            Thread.sleep(5000);
                         } catch (InterruptedException ignored) {
                         }
                     }