You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/25 23:58:59 UTC

svn commit: r660047 - /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Author: azeez
Date: Sun May 25 14:58:58 2008
New Revision: 660047

URL: http://svn.apache.org/viewvc?rev=660047&view=rev
Log:
Retrying, if responses are not received for a JOIN message


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=660047&r1=660046&r2=660047&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sun May 25 14:58:58 2008
@@ -176,14 +176,26 @@
         TribesUtil.printMembers(membershipManager);
 
         // If a WKA scheme is used, JOIN the group and get the member list
-        if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
+        if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)
+            && membershipManager.getMembers().length > 0) {
             try {
+                log.info("Sending JOIN message to WKA members...");
                 Member[] wkaMembers = membershipManager.getMembers(); // The well-known members
-                Response[] responses = rpcChannel.send(wkaMembers,
-                                                       new JoinGroupCommand(),
-                                                       RpcChannel.FIRST_REPLY,
-                                                       Channel.SEND_OPTIONS_ASYNCHRONOUS,
-                                                       10000);
+                Response[] responses;
+                do {
+                    responses = rpcChannel.send(wkaMembers,
+                                                new JoinGroupCommand(),
+                                                RpcChannel.ALL_REPLY,
+                                                Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                10000);
+                    if (responses.length == 0) {
+                        try {
+                            Thread.sleep(500);
+                        } catch (InterruptedException ignored) {
+                        }
+                    }
+                } while (responses.length == 0);  // Wait until we've received at least one response
+
                 for (Response response : responses) {
                     MemberListCommand command = (MemberListCommand) response.getMessage();
                     command.setMembershipManager(membershipManager);
@@ -215,14 +227,14 @@
         // If configuration management is enabled, get the latest config from a neighbour
         if (configurationManager != null) {
             configurationManager.setSender(channelSender);
-            initializeSystem(rpcChannel, new GetConfigurationCommand());
+            initializeSystem(new GetConfigurationCommand());
         }
 
         // If context replication is enabled, get the latest state from a neighbour
         if (contextManager != null) {
             contextManager.setSender(channelSender);
             channelListener.setContextManager(contextManager);
-            initializeSystem(rpcChannel, new GetStateCommand());
+            initializeSystem(new GetStateCommand());
             ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
             configurationContext.addContextListener(contextListener);
         }
@@ -650,13 +662,13 @@
     /**
      * Get some information from a neighbour. This information will be used by this node to
      * initialize itself
+     * <p/>
+     * rpcChannel is The utility for sending RPC style messages to the channel
      *
-     * @param rpcChannel The utility for sending RPC style messages to the channel
-     * @param command    The control command to send
+     * @param command The control command to send
      * @throws ClusteringFault If initialization code failed on this node
      */
-    private void initializeSystem(RpcChannel rpcChannel, ControlCommand command)
-            throws ClusteringFault {
+    private void initializeSystem(ControlCommand command) throws ClusteringFault {
         // If there is at least one member in the cluster,
         //  get the current initialization info from a member
         int numberOfTries = 0; // Don't keep on trying indefinitely
@@ -678,11 +690,20 @@
             log.info("Trying to send intialization request to " + memberHost);
             try {
                 if (!sentMembersList.contains(memberHost)) {
-                    Response[] responses = rpcChannel.send(new Member[]{member},
-                                                           command,
-                                                           RpcChannel.FIRST_REPLY,
-                                                           Channel.SEND_OPTIONS_ASYNCHRONOUS,
-                                                           10000);
+                    Response[] responses;
+                    do {
+                        responses = rpcChannel.send(new Member[]{member},
+                                                    command,
+                                                    RpcChannel.FIRST_REPLY,
+                                                    Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                    10000);
+                        if (responses.length == 0) {
+                            try {
+                                Thread.sleep(500);
+                            } catch (InterruptedException ignored) {
+                            }
+                        }
+                    } while (responses.length == 0);
                     if (responses.length > 0) {
                         ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
                         break;