You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/25 23:58:59 UTC
svn commit: r660047 -
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Author: azeez
Date: Sun May 25 14:58:58 2008
New Revision: 660047
URL: http://svn.apache.org/viewvc?rev=660047&view=rev
Log:
Retrying, if responses are not received for a JOIN message
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=660047&r1=660046&r2=660047&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sun May 25 14:58:58 2008
@@ -176,14 +176,26 @@
TribesUtil.printMembers(membershipManager);
// If a WKA scheme is used, JOIN the group and get the member list
- if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
+ if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)
+ && membershipManager.getMembers().length > 0) {
try {
+ log.info("Sending JOIN message to WKA members...");
Member[] wkaMembers = membershipManager.getMembers(); // The well-known members
- Response[] responses = rpcChannel.send(wkaMembers,
- new JoinGroupCommand(),
- RpcChannel.FIRST_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
- 10000);
+ Response[] responses;
+ do {
+ responses = rpcChannel.send(wkaMembers,
+ new JoinGroupCommand(),
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ if (responses.length == 0) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ } while (responses.length == 0); // Wait until we've received at least one response
+
for (Response response : responses) {
MemberListCommand command = (MemberListCommand) response.getMessage();
command.setMembershipManager(membershipManager);
@@ -215,14 +227,14 @@
// If configuration management is enabled, get the latest config from a neighbour
if (configurationManager != null) {
configurationManager.setSender(channelSender);
- initializeSystem(rpcChannel, new GetConfigurationCommand());
+ initializeSystem(new GetConfigurationCommand());
}
// If context replication is enabled, get the latest state from a neighbour
if (contextManager != null) {
contextManager.setSender(channelSender);
channelListener.setContextManager(contextManager);
- initializeSystem(rpcChannel, new GetStateCommand());
+ initializeSystem(new GetStateCommand());
ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
configurationContext.addContextListener(contextListener);
}
@@ -650,13 +662,13 @@
/**
* Get some information from a neighbour. This information will be used by this node to
* initialize itself
+ * <p/>
+ * rpcChannel is The utility for sending RPC style messages to the channel
*
- * @param rpcChannel The utility for sending RPC style messages to the channel
- * @param command The control command to send
+ * @param command The control command to send
* @throws ClusteringFault If initialization code failed on this node
*/
- private void initializeSystem(RpcChannel rpcChannel, ControlCommand command)
- throws ClusteringFault {
+ private void initializeSystem(ControlCommand command) throws ClusteringFault {
// If there is at least one member in the cluster,
// get the current initialization info from a member
int numberOfTries = 0; // Don't keep on trying indefinitely
@@ -678,11 +690,20 @@
log.info("Trying to send intialization request to " + memberHost);
try {
if (!sentMembersList.contains(memberHost)) {
- Response[] responses = rpcChannel.send(new Member[]{member},
- command,
- RpcChannel.FIRST_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
- 10000);
+ Response[] responses;
+ do {
+ responses = rpcChannel.send(new Member[]{member},
+ command,
+ RpcChannel.FIRST_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ if (responses.length == 0) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ } while (responses.length == 0);
if (responses.length > 0) {
((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
break;