You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/05/21 20:02:54 UTC
svn commit: r658792 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering:
control/wka/JoinGroupCommand.java tribes/ControlCommandProcessor.java
tribes/InitializationRequestHandler.java tribes/TribesClusterManager.java
Author: azeez
Date: Wed May 21 11:02:53 2008
New Revision: 658792
URL: http://svn.apache.org/viewvc?rev=658792&view=rev
Log:
Add a member to the group when it sends a JOIN message to a well-known member
Removed:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java?rev=658792&r1=658791&r2=658792&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java Wed May 21 11:02:53 2008
@@ -18,15 +18,17 @@
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
*
*/
public class JoinGroupCommand extends ControlCommand {
- public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
- //TODO: Method implementation
- System.out.println("### EXEC JoinGroupCommand");
- //todo: send member list, add to static member list, send member joined to others
+ private Log log = LogFactory.getLog(JoinGroupCommand.class);
+
+ public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ log.info("JOIN request received");
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java?rev=658792&r1=658791&r2=658792&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java Wed May 21 11:02:53 2008
@@ -19,14 +19,20 @@
package org.apache.axis2.clustering.tribes;
+import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
+import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,35 +44,71 @@
public class InitializationRequestHandler implements RpcCallback {
private static Log log = LogFactory.getLog(InitializationRequestHandler.class);
- private ControlCommandProcessor controlCommandProcessor;
-
+ private ConfigurationContext configurationContext;
+ private MembershipManager membershipManager;
+ private StaticMembershipInterceptor staticMembershipInterceptor;
+
+ public InitializationRequestHandler(ConfigurationContext configurationContext, MembershipManager membershipManager,
+ StaticMembershipInterceptor staticMembershipInterceptor) {
+ this.configurationContext = configurationContext;
+ this.membershipManager = membershipManager;
+ this.staticMembershipInterceptor = staticMembershipInterceptor;
+ }
- public InitializationRequestHandler(ControlCommandProcessor controlCommandProcessor) {
- this.controlCommandProcessor = controlCommandProcessor;
+ public void setConfigurationContext(ConfigurationContext configurationContext) {
+ this.configurationContext = configurationContext;
}
public Serializable replyRequest(Serializable msg, Member member) {
- if (msg instanceof GetStateCommand ||
- msg instanceof GetConfigurationCommand) {
+ if (msg instanceof GetStateCommand) {
+ // If a GetStateRequest is received by a node which has not yet initialized
+ // this node cannot send a response to the state requester. So we simply return.
+ if (configurationContext.
+ getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ return null;
+ }
try {
log.info("Received " + msg + " initialization request message from " +
TribesUtil.getHost(member));
- return controlCommandProcessor.process((ControlCommand) msg); // response is either GetConfigurationResponseCommand or GetStateResponseCommand
+ GetStateCommand command = (GetStateCommand) msg;
+ command.execute(configurationContext);
+ GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
+ getStateRespCmd.setCommands(((GetStateCommand) command).getCommands());
+ return getStateRespCmd;
} catch (ClusteringFault e) {
String errMsg = "Cannot handle initialization request";
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
- } else if (msg instanceof JoinGroupCommand) {
- log.info("Received " + msg + " from " + TribesUtil.getHost(member));
-// JoinGroupCommand command = (JoinGroupCommand) msg;
+ } else if (msg instanceof GetConfigurationCommand) {
+ // If a GetConfigurationCommand is received by a node which has not yet initialized
+ // this node cannot send a response to the state requester. So we simply return.
+ if (configurationContext.
+ getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ return null;
+ }
try {
- return controlCommandProcessor.process((ControlCommand) msg); // response is
+ log.info("Received " + msg + " initialization request message from " +
+ TribesUtil.getHost(member));
+ GetConfigurationCommand command = (GetConfigurationCommand) msg;
+ command.execute(configurationContext);
+ GetConfigurationResponseCommand
+ getConfigRespCmd = new GetConfigurationResponseCommand();
+ getConfigRespCmd.setServiceGroups(command.getServiceGroupNames());
+ return getConfigRespCmd;
} catch (ClusteringFault e) {
- String errMsg = "Cannot handle JOIN request";
+ String errMsg = "Cannot handle initialization request";
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
+ } else if (msg instanceof JoinGroupCommand) {
+ log.info("Received JOIN message from " + TribesUtil.getHost(member));
+ staticMembershipInterceptor.memberAdded(member);
+ membershipManager.memberAdded(member);
+
+ MemberListCommand memListCmd = new MemberListCommand();
+ //todo populate list
+ return memListCmd;
}
return null;
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=658792&r1=658791&r2=658792&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 21 11:02:53 2008
@@ -89,16 +89,15 @@
private ManagedChannel channel;
private RpcChannel rpcChannel;
private ConfigurationContext configurationContext;
- private ControlCommandProcessor controlCmdProcessor;
private ChannelListener channelListener;
private ChannelSender channelSender;
private MembershipManager membershipManager;
+ private InitializationRequestHandler initializationRequestHandler;
private StaticMembershipInterceptor staticMembershipInterceptor;
private org.apache.axis2.clustering.Member[] members;
public TribesClusterManager() {
parameters = new HashMap<String, Parameter>();
- controlCmdProcessor = new ControlCommandProcessor(configurationContext);
}
public void setMembers(org.apache.axis2.clustering.Member[] members) {
@@ -164,9 +163,12 @@
// RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
// picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+ initializationRequestHandler = new InitializationRequestHandler(configurationContext,
+ membershipManager,
+ staticMembershipInterceptor);
rpcChannel =
new RpcChannel(domain, channel,
- new InitializationRequestHandler(controlCmdProcessor));
+ initializationRequestHandler);
log.info("Local Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers(membershipManager);
@@ -184,7 +186,7 @@
}
} catch (ChannelException e) {
String msg = "Could not JOIN group";
- log.error(e);
+ log.error(msg, e);
throw new ClusteringFault(msg, e);
}
}
@@ -522,6 +524,10 @@
nbc.setPrevious(dfi);
channel.addInterceptor(nbc);*/
+ // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+ channel.addInterceptor(atMostOnceInterceptor);
+
// Add the OrderInterceptor to preserve sender ordering
OrderInterceptor orderInterceptor = new OrderInterceptor();
orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
@@ -536,10 +542,6 @@
staticMembershipInterceptor = new StaticMembershipInterceptor();
channel.addInterceptor(staticMembershipInterceptor);
}
-
- // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
- AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
- channel.addInterceptor(atMostOnceInterceptor);
}
/**
@@ -740,7 +742,9 @@
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
- controlCmdProcessor.setConfigurationContext(configurationContext);
+ if (initializationRequestHandler != null) {
+ initializationRequestHandler.setConfigurationContext(configurationContext);
+ }
if (channelListener != null) {
channelListener.setConfigurationContext(configurationContext);
}