You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/01/07 17:06:53 UTC

svn commit: r609656 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes: AckManager.java ChannelSender.java MembershipManager.java TribesClusterManager.java TribesMembershipListener.java TribesUtil.java

Author: azeez
Date: Mon Jan  7 08:06:51 2008
New Revision: 609656

URL: http://svn.apache.org/viewvc?rev=609656&view=rev
Log:
1. More improvements to membership management 
2. First try to get the config & state information from the longest living member of the group, when a node comes up


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Mon Jan  7 08:06:51 2008
@@ -45,7 +45,7 @@
      * When a particular member send an ACK for a particular message, the ACK is stored here
      *
      * @param messageUniqueId ID of the message being ACKed
-     * @param memberId The ID of the member who ACKed the above message
+     * @param memberId        The ID of the member who ACKed the above message
      */
     public static void addAcknowledgement(String messageUniqueId,
                                           String memberId) {
@@ -67,9 +67,9 @@
      * and then return false.
      *
      * @param messageUniqueId ID of the message being ACKed
-     * @param sender
+     * @param sender          The utility for sending the message
      * @return true - if all members have ACKed the message, false - otherwise
-     * @throws ClusteringFault
+     * @throws ClusteringFault If an error occurs while retrannsmitting a message
      */
     public static boolean isMessageAcknowledged(String messageUniqueId,
                                                 ChannelSender sender) throws ClusteringFault {
@@ -86,7 +86,7 @@
 
         // Check that all members in the memberList are same as the total member list,
         // which will indicate that all members have ACKed the message
-        Member[] members = sender.getChannel().getMembers();
+        Member[] members = MembershipManager.getMembers();
         if (members.length == 0) {
             isAcknowledged = true;
         } else {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Mon Jan  7 08:06:51 2008
@@ -45,67 +45,46 @@
         }
         long timeToSend = 0;
 
-        // Keep retrying, since at the point of trying to send the msg, a member may leave the group
-        // causing a view change. All nodes in a view should get the msg
-        //TODO: Sometimes Tribes incorrectly detects that a member has left a group
-        while (true) {
-            if (channel.getMembers().length > 0) {
-                try {
-                    long start = System.currentTimeMillis();
-                    channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
-                    timeToSend = System.currentTimeMillis() - start;
-                    log.debug("Sent " + msg + " to group");
-                    break;
-                } catch (NotSerializableException e) {
-                    String message = "Could not send command message " + msg +
-                                     " to group since it is not serializable.";
-                    log.error(message, e);
-                    throw new ClusteringFault(message, e);
-                } catch (Exception e) {
-                    String message = "Error sending command message : " + msg +
-                                     ". Reason " + e.getMessage();
-                    log.warn(message, e);
-                }
-            } else {
-                break;
-            }
-        }
-        return timeToSend;
-    }
-
-    public long sendToGroup(ClusteringCommand msg, Member[] members) throws ClusteringFault {
-        if (channel == null) {
-            return 0;
-        }
-        long timeToSend = 0;
+        Member[] members = MembershipManager.getMembers();
 
         // Keep retrying, since at the point of trying to send the msg, a member may leave the group
         // causing a view change. All nodes in a view should get the msg
         //TODO: Sometimes Tribes incorrectly detects that a member has left a group
-        while (true) {
-            if (channel.getMembers().length > 0) {
-                try {
-                    long start = System.currentTimeMillis();
-                    channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
-                    timeToSend = System.currentTimeMillis() - start;
-                    log.debug("Sent " + msg + " to group");
-                    break;
-                } catch (NotSerializableException e) {
-                    String message = "Could not send command message " + msg +
-                                     " to group since it is not serializable.";
-                    log.error(message, e);
-                    throw new ClusteringFault(message, e);
-                } catch (ChannelException e) {
-                    
-                } catch (Exception e) {
-                    String message = "Error sending command message : " + msg +
-                                     ". Reason " + e.getMessage();
-                    log.warn(message, e);
+//        while (true) {
+        if (members.length > 0) {
+            try {
+                long start = System.currentTimeMillis();
+                channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
+                timeToSend = System.currentTimeMillis() - start;
+                log.debug("Sent " + msg + " to group");
+//                    break;
+            } catch (NotSerializableException e) {
+                String message = "Could not send command message " + msg +
+                                 " to group since it is not serializable.";
+                log.error(message, e);
+                throw new ClusteringFault(message, e);
+            } catch (ChannelException e) {
+                //TODO: What to do for faulty members
+                ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+                for (int i = 0; i < faultyMembers.length; i++) {
+                    ChannelException.FaultyMember faultyMember = faultyMembers[i];
+                    Member member = faultyMember.getMember();
+                    log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
+                              faultyMember.getCause(), faultyMember.getCause());
+
+                    //TODO: Shall we try to resend to these members?
                 }
-            } else {
-                break;
+
+            } catch (Exception e) {
+                String message = "Error sending command message : " + msg +
+                                 ". Reason " + e.getMessage();
+                log.warn(message, e);
             }
         }
+//            else {
+//                break;
+//            }
+//        }
         return timeToSend;
     }
 
@@ -146,6 +125,10 @@
                              " since it is not serializable.";
             log.error(message, e);
             throw new ClusteringFault(message, e);
+        } catch (ChannelException e) {
+            ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+            log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
+                          faultyMembers[0].getCause(), faultyMembers[0].getCause());
         } catch (Exception e) {
             String message = "Could not send message to " + TribesUtil.getHost(member) +
                              ". Reason " + e.getMessage();

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=609656&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Mon Jan  7 08:06:51 2008
@@ -0,0 +1,65 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.Member;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Responsible for managing the membership
+ */
+public class MembershipManager {
+    private static final List members = new ArrayList();
+
+    public synchronized static void memberAdded(Member member) {
+        members.add(member);
+    }
+
+    public synchronized static void memberDisappeared(Member member) {
+        members.remove(member);
+    }
+
+    public synchronized static Member[] getMembers() {
+        return (Member[]) members.toArray(new Member[members.size()]);
+    }
+
+    public synchronized static Member getLongestAliveMember() {
+        Member longestAliveMember = null;
+        if (members.size() > 0) {
+            long longestAliveTime = ((Member) members.get(0)).getMemberAliveTime();
+            for (int i = 0; i < members.size(); i++) {
+                Member member = (Member) members.get(i);
+                if (longestAliveTime < member.getMemberAliveTime()) {
+                    longestAliveTime = member.getMemberAliveTime();
+                    longestAliveMember = member;
+                }
+            }
+        }
+        return longestAliveMember;
+    }
+
+    public synchronized static Member getRandomMember() {
+        if(members.size() == 0){
+            return null;
+        }
+        int memberIndex = new Random().nextInt(members.size());
+        return (Member) members.get(memberIndex);
+    }
+
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan  7 08:06:51 2008
@@ -22,7 +22,6 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusterManager;
-import org.apache.axis2.clustering.ClusteringCommand;
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.RequestBlockingHandler;
@@ -31,6 +30,7 @@
 import org.apache.axis2.clustering.context.ClusteringContextListener;
 import org.apache.axis2.clustering.context.ContextManager;
 import org.apache.axis2.clustering.context.DefaultContextManager;
+import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.context.ConfigurationContext;
@@ -55,7 +55,6 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
 
 public class TribesClusterManager implements ClusterManager {
     private static final Log log = LogFactory.getLog(TribesClusterManager.class);
@@ -199,21 +198,21 @@
         }
         sender.setChannel(channel);
 
-        Member[] members = channel.getMembers();
+//        Member[] members = channel.getMembers();
         log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
-        TribesUtil.printMembers(members);
+        TribesUtil.printMembers();
 
         // If configuration management is enabled, get the latest config from a neighbour  TODO: from the longest living neighbour
         if (configurationManager != null) {
             configurationManager.setSender(sender);
-            getInitializationMessage(members, sender, new GetConfigurationCommand());
+            getInitializationMessage(sender, new GetConfigurationCommand());
         }
 
         // If context replication is enabled, get the latest state from a neighbour  TODO: from the longest living neighbour
         if (contextManager != null) {
             contextManager.setSender(sender);
             channelListener.setContextManager(contextManager);
-            getInitializationMessage(members, sender, new GetStateCommand());
+            getInitializationMessage(sender, new GetStateCommand());
             ClusteringContextListener contextListener = new ClusteringContextListener(sender);
             configurationContext.addContextListener(contextListener);
         }
@@ -225,20 +224,18 @@
      * Get some information from a neighbour. This information will be used by this node to
      * initialize itself
      *
-     * @param members
-     * @param sender
-     * @param command
+     * @param sender  The utility for sending messages to the channel
+     * @param command The control command to send
      */
-    private void getInitializationMessage(Member[] members,
-                                          ChannelSender sender,
-                                          ClusteringCommand command) {
-        // If there is at least one member in the Tribe, get the current initialization info from a member
-        Random random = new Random();
+    private void getInitializationMessage(ChannelSender sender, ControlCommand command) {
+        // If there is at least one member in the cluster,
+        //  get the current initialization info from a member
         int numberOfTries = 0; // Don't keep on trying indefinitely
 
         // Keep track of members to whom we already sent an initialization command
         // Do not send another request to these members
         List sentMembersList = new ArrayList();
+        Member[] members = MembershipManager.getMembers();
         while (members.length > 0 &&
                configurationContext.
                        getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
@@ -246,13 +243,9 @@
 
             // While there are members and GetStateResponseCommand is not received do the following
             try {
-                members = channel.getMembers();
-
-                //TODO: Can get longest alive member, willdo with membership awareness
-                members[0].getMemberAliveTime();
-
-                int memberIndex = random.nextInt(members.length);
-                Member member = members[memberIndex];
+                Member member = (numberOfTries == 0) ?
+                                MembershipManager.getLongestAliveMember() : // First try to get from the longest alive member
+                                MembershipManager.getRandomMember(); // Else get from a random member
                 if (!sentMembersList.contains(TribesUtil.getHost(member))) {
                     long tts = sender.sendToMember(command, member);
                     configurationContext.
@@ -260,12 +253,13 @@
                                                      new Long(tts));
                     sentMembersList.add(TribesUtil.getHost(member));
                     log.debug("WAITING FOR STATE INITIALIZATION MESSAGE...");
-                    Thread.sleep(tts + 5);
+                    Thread.sleep(tts + 5 * (numberOfTries + 1));
                 }
             } catch (Exception e) {
                 log.error(e);
                 break;
             }
+            members = MembershipManager.getMembers();
             numberOfTries++;
         }
     }
@@ -334,9 +328,6 @@
 
     public boolean synchronizeAllMembers() {
         Parameter syncAllParam = getParameter(ClusteringConstants.SYNCHRONIZE_ALL_MEMBERS);
-        if (syncAllParam == null) {
-            return true;
-        }
-        return Boolean.parseBoolean((String) syncAllParam.getValue());
+        return syncAllParam == null || Boolean.parseBoolean((String) syncAllParam.getValue());
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Mon Jan  7 08:06:51 2008
@@ -32,11 +32,14 @@
 
     public void memberAdded(Member member) {
         log.info("New member " + TribesUtil.getHost(member) + " joined cluster.");
+        MembershipManager.memberAdded(member);
        //        System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
     }
 
     public void memberDisappeared(Member member) {
         log.info("Member " + TribesUtil.getHost(member) + " left cluster");
+        MembershipManager.memberDisappeared(member);
+
 //        System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan  7 08:06:51 2008
@@ -28,7 +28,8 @@
 
     private static Log log = LogFactory.getLog(TribesUtil.class);
 
-    public static void printMembers(Member[] members) {
+    public static void printMembers() {
+        Member[] members = MembershipManager.getMembers();
         if (members != null) {
             int length = members.length;
             if (length > 0) {
@@ -56,20 +57,5 @@
 
     public static String getLocalHost(Channel channel) {
         return getHost(channel.getLocalMember(true));
-    }
-
-    public static Member getLongestAliveMember(Member[] members) {
-        Member longestAliveMember = null;
-        if (members.length > 0) {
-            long longestAliveTime = members[0].getMemberAliveTime();
-            for (int i = 0; i < members.length; i++) {
-                Member member = members[i];
-                if (longestAliveTime < member.getMemberAliveTime()) {
-                    longestAliveTime = member.getMemberAliveTime();
-                    longestAliveMember = member;
-                }
-            }
-        }
-        return longestAliveMember;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org