You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/21 23:20:49 UTC
svn commit: r658880 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering:
control/wka/ tribes/
Author: azeez
Date: Wed May 21 14:20:48 2008
New Revision: 658880
URL: http://svn.apache.org/viewvc?rev=658880&view=rev
Log:
When a new member joins by notifying a WKA member, this WKA member should notify the other members
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
- copied, changed from r658777, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
Removed:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java Wed May 21 14:20:48 2008
@@ -15,10 +15,16 @@
*/
package org.apache.axis2.clustering.control.wka;
-import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.context.ConfigurationContext;
/**
*
*/
-public class LeaveGroupCommand extends ChannelData {
+public class LeaveGroupCommand extends ControlCommand {
+ public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ //TODO: Method implementation
+
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Wed May 21 14:20:48 2008
@@ -15,10 +15,46 @@
*/
package org.apache.axis2.clustering.control.wka;
-import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.tribes.MembershipManager;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Arrays;
/**
*
*/
-public class MemberJoinedCommand extends ChannelData {
+public class MemberJoinedCommand extends ControlCommand {
+
+ private static final Log log = LogFactory.getLog(MemberJoinedCommand.class);
+ private Member member;
+ private MembershipManager membershipManager;
+ private StaticMembershipInterceptor staticMembershipInterceptor;
+
+ public void setMembershipManager(MembershipManager membershipManager) {
+ this.membershipManager = membershipManager;
+ }
+
+ public void setStaticMembershipInterceptor(
+ StaticMembershipInterceptor staticMembershipInterceptor) {
+ this.staticMembershipInterceptor = staticMembershipInterceptor;
+ }
+
+ public void setMember(Member member) {
+ this.member = member;
+ }
+
+ public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ Member localMember = membershipManager.getLocalMember();
+ if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
+ localMember.getPort() == member.getPort())) {
+ membershipManager.memberAdded(member);
+ staticMembershipInterceptor.memberAdded(member);
+ }
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Wed May 21 14:20:48 2008
@@ -15,16 +15,45 @@
*/
package org.apache.axis2.clustering.control.wka;
-import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.tribes.MembershipManager;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+
+import java.util.Arrays;
/**
*
*/
public class MemberListCommand extends ControlCommand {
+
+ private Member[] members;
+ private MembershipManager membershipManager;
+ private StaticMembershipInterceptor staticMembershipInterceptor;
+
+ public void setMembershipManager(MembershipManager membershipManager) {
+ this.membershipManager = membershipManager;
+ }
+
+ public void setStaticMembershipInterceptor(
+ StaticMembershipInterceptor staticMembershipInterceptor) {
+ this.staticMembershipInterceptor = staticMembershipInterceptor;
+ }
+
+ public void setMembers(Member[] members) {
+ this.members = members;
+ }
+
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
- //TODO: Method implementation
- System.out.println("#### MEMBER LIST CMD");
+ for (Member member : members) {
+ Member localMember = membershipManager.getLocalMember();
+ if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
+ localMember.getPort() == member.getPort())) {
+ membershipManager.memberAdded(member);
+ staticMembershipInterceptor.memberAdded(member);
+ }
+ }
}
}
Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (from r658777, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java&r1=658777&r2=658880&rev=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Wed May 21 14:20:48 2008
@@ -19,55 +19,137 @@
package org.apache.axis2.clustering.tribes;
+import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
+import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Serializable;
/**
- * Handles initialization requests(GetConfiguration & GetState) from newly joining members
+ * Handles RPC Channel requests from members
*/
-public class InitializationRequestHandler implements RpcCallback {
+public class RpcRequestHandler implements RpcCallback {
- private static Log log = LogFactory.getLog(InitializationRequestHandler.class);
- private ControlCommandProcessor controlCommandProcessor;
+ private static Log log = LogFactory.getLog(RpcRequestHandler.class);
+ private ConfigurationContext configurationContext;
+ private MembershipManager membershipManager;
+ private StaticMembershipInterceptor staticMembershipInterceptor;
+ private RpcChannel rpcChannel;
+
+ public RpcRequestHandler(ConfigurationContext configurationContext,
+ MembershipManager membershipManager,
+ StaticMembershipInterceptor staticMembershipInterceptor) {
+ this.configurationContext = configurationContext;
+ this.membershipManager = membershipManager;
+ this.staticMembershipInterceptor = staticMembershipInterceptor;
+ }
+ public void setRpcChannel(RpcChannel rpcChannel) {
+ this.rpcChannel = rpcChannel;
+ }
- public InitializationRequestHandler(ControlCommandProcessor controlCommandProcessor) {
- this.controlCommandProcessor = controlCommandProcessor;
+ public void setConfigurationContext(ConfigurationContext configurationContext) {
+ this.configurationContext = configurationContext;
}
public Serializable replyRequest(Serializable msg, Member member) {
- if (msg instanceof GetStateCommand ||
- msg instanceof GetConfigurationCommand) {
+ if (msg instanceof GetStateCommand) {
+ // If a GetStateRequest is received by a node which has not yet initialized
+ // this node cannot send a response to the state requester. So we simply return.
+ if (configurationContext.
+ getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ return null;
+ }
try {
log.info("Received " + msg + " initialization request message from " +
TribesUtil.getHost(member));
- return controlCommandProcessor.process((ControlCommand) msg); // response is either GetConfigurationResponseCommand or GetStateResponseCommand
+ GetStateCommand command = (GetStateCommand) msg;
+ command.execute(configurationContext);
+ GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
+ getStateRespCmd.setCommands(command.getCommands());
+ return getStateRespCmd;
} catch (ClusteringFault e) {
String errMsg = "Cannot handle initialization request";
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
- } else if (msg instanceof JoinGroupCommand) {
- log.info("Received " + msg + " from " + TribesUtil.getHost(member));
-// JoinGroupCommand command = (JoinGroupCommand) msg;
+ } else if (msg instanceof GetConfigurationCommand) {
+ // If a GetConfigurationCommand is received by a node which has not yet initialized
+ // this node cannot send a response to the state requester. So we simply return.
+ if (configurationContext.
+ getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ return null;
+ }
try {
- return controlCommandProcessor.process((ControlCommand) msg); // response is
+ log.info("Received " + msg + " initialization request message from " +
+ TribesUtil.getHost(member));
+ GetConfigurationCommand command = (GetConfigurationCommand) msg;
+ command.execute(configurationContext);
+ GetConfigurationResponseCommand
+ getConfigRespCmd = new GetConfigurationResponseCommand();
+ getConfigRespCmd.setServiceGroups(command.getServiceGroupNames());
+ return getConfigRespCmd;
} catch (ClusteringFault e) {
+ String errMsg = "Cannot handle initialization request";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
+ }
+ } else if (msg instanceof JoinGroupCommand) {
+ log.info("Received JOIN message from " + TribesUtil.getHost(member));
+ MemberListCommand memListCmd;
+ try {
+ // Add the member
+ staticMembershipInterceptor.memberAdded(member);
+ membershipManager.memberAdded(member);
+
+ // Return the list of current members to the caller
+ memListCmd = new MemberListCommand();
+ memListCmd.setMembers(membershipManager.getMembers());
+
+ // Send a message to all other informing that a member has joined
+ MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
+ memberJoinedCommand.setMember(member);
+ rpcChannel.send(membershipManager.getMembers(),
+ memberJoinedCommand,
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ } catch (Exception e) {
String errMsg = "Cannot handle JOIN request";
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
+ return memListCmd;
+ } else if (msg instanceof MemberJoinedCommand) {
+ log.info("Received MEMBER_JOINED message from " + TribesUtil.getHost(member));
+ try {
+ MemberJoinedCommand command = (MemberJoinedCommand) msg;
+ command.setMembershipManager(membershipManager);
+ command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ command.execute(configurationContext);
+ } catch (ClusteringFault e) {
+ String errMsg = "Cannot handle MEMBER_JOINED notification";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
+ }
}
+
+ //TODO: If a WKA member fails, it shud figure out the membership. The WKA member write membership to a local file
return null;
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 21 14:20:48 2008
@@ -34,6 +34,7 @@
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.HandlerDescription;
import org.apache.axis2.description.Parameter;
@@ -92,7 +93,7 @@
private ChannelListener channelListener;
private ChannelSender channelSender;
private MembershipManager membershipManager;
- private InitializationRequestHandler initializationRequestHandler;
+ private RpcRequestHandler rpcRequestHandler;
private StaticMembershipInterceptor staticMembershipInterceptor;
private org.apache.axis2.clustering.Member[] members;
@@ -163,12 +164,12 @@
// RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
// picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
- initializationRequestHandler = new InitializationRequestHandler(configurationContext,
- membershipManager,
- staticMembershipInterceptor);
- rpcChannel =
- new RpcChannel(domain, channel,
- initializationRequestHandler);
+ rpcRequestHandler = new RpcRequestHandler(configurationContext,
+ membershipManager,
+ staticMembershipInterceptor);
+ rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+ rpcRequestHandler.setRpcChannel(rpcChannel);
+
log.info("Local Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers(membershipManager);
@@ -182,7 +183,10 @@
Channel.SEND_OPTIONS_ASYNCHRONOUS,
10000);
if (responses.length > 0) {
- ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+ MemberListCommand command = (MemberListCommand) responses[0].getMessage();
+ command.setMembershipManager(membershipManager);
+ command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ command.execute(configurationContext); // Do the initialization
}
} catch (ChannelException e) {
String msg = "Could not JOIN group";
@@ -527,7 +531,7 @@
// Add a AtMostOnceInterceptor to support at-most-once message processing semantics
AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
channel.addInterceptor(atMostOnceInterceptor);
-
+
// Add the OrderInterceptor to preserve sender ordering
OrderInterceptor orderInterceptor = new OrderInterceptor();
orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
@@ -742,8 +746,8 @@
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
- if (initializationRequestHandler != null) {
- initializationRequestHandler.setConfigurationContext(configurationContext);
+ if (rpcRequestHandler != null) {
+ rpcRequestHandler.setConfigurationContext(configurationContext);
}
if (channelListener != null) {
channelListener.setConfigurationContext(configurationContext);