You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/05/17 16:00:32 UTC
svn commit: r657356 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/tribes/
kernel/src/org/apache/axis2/clustering/
kernel/src/org/apache/axis2/clustering/configuration/
Author: azeez
Date: Sat May 17 07:00:32 2008
New Revision: 657356
URL: http://svn.apache.org/viewvc?rev=657356&view=rev
Log:
1. code refactoring
2. javadocs added
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Sat May 17 07:00:32 2008
@@ -47,17 +47,14 @@
private DefaultContextManager contextManager;
private DefaultConfigurationManager configurationManager;
- private ControlCommandProcessor controlCommandProcessor;
private ConfigurationContext configurationContext;
public ChannelListener(ConfigurationContext configurationContext,
DefaultConfigurationManager configurationManager,
- DefaultContextManager contextManager,
- ControlCommandProcessor controlCommandProcessor) {
+ DefaultContextManager contextManager) {
this.configurationManager = configurationManager;
this.contextManager = contextManager;
- this.controlCommandProcessor = controlCommandProcessor;
this.configurationContext = configurationContext;
}
@@ -91,22 +88,22 @@
public void messageReceived(Serializable msg, Member sender) {
try {
AxisConfiguration configuration = configurationContext.getAxisConfiguration();
- List classLoaders = new ArrayList();
+ List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();
classLoaders.add(configuration.getSystemClassLoader());
classLoaders.add(getClass().getClassLoader());
for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) {
AxisServiceGroup group = (AxisServiceGroup) iter.next();
classLoaders.add(group.getServiceGroupClassLoader());
}
- for (Iterator iter = configuration.getModules().values().iterator(); iter.hasNext();) {
- AxisModule module = (AxisModule) iter.next();
+ for(Object obj: configuration.getModules().values()){
+ AxisModule module = (AxisModule) obj;
classLoaders.add(module.getModuleClassLoader());
}
byte[] message = ((ByteMessage) msg).getMessage();
msg = XByteBuffer.deserialize(message,
0,
message.length,
- (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+ classLoaders.toArray(new ClassLoader[classLoaders.size()]));
} catch (Exception e) {
String errMsg = "Cannot deserialize received message";
log.error(errMsg, e);
@@ -127,7 +124,7 @@
}
try {
- processMessage(msg, sender);
+ processMessage(msg);
} catch (Exception e) {
String errMsg = "Cannot process received message";
log.error(errMsg, e);
@@ -135,7 +132,7 @@
}
}
- private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
+ private void processMessage(Serializable msg) throws ClusteringFault {
if (msg instanceof ContextClusteringCommand && contextManager != null) {
ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
ctxCmd.execute(configurationContext);
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sat May 17 07:00:32 2008
@@ -118,75 +118,16 @@
* @throws ClusteringFault If initialization fails
*/
public void init() throws ClusteringFault {
-
- AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
- for (Object o : axisConfig.getInFlowPhases()) {
- Phase phase = (Phase) o;
- if (phase instanceof DispatchPhase) {
- RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
- if (!phase.getHandlers().contains(requestBlockingHandler)) {
- PhaseRule rule = new PhaseRule("Dispatch");
- rule.setAfter("SOAPMessageBodyBasedDispatcher");
- rule.setBefore("InstanceDispatcher");
- HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
- handlerDesc.setHandler(requestBlockingHandler);
- handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
- handlerDesc.setRules(rule);
- phase.addHandler(requestBlockingHandler);
-
- log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
- " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFlow");
- break;
- }
- }
- }
- for (Object o : axisConfig.getInFaultFlowPhases()) {
- Phase phase = (Phase) o;
- if (phase instanceof DispatchPhase) {
- RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
- if (!phase.getHandlers().contains(requestBlockingHandler)) {
- PhaseRule rule = new PhaseRule("Dispatch");
- rule.setAfter("SOAPMessageBodyBasedDispatcher");
- rule.setBefore("InstanceDispatcher");
- HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
- handlerDesc.setHandler(requestBlockingHandler);
- handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
- handlerDesc.setRules(rule);
- phase.addHandler(requestBlockingHandler);
-
- log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
- " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFaultFlow");
- break;
- }
- }
- }
+ log.info("Initializing cluster...");
+ addRequestBlockingHandlerToInFlows();
membershipManager = new MembershipManager();
channel = new GroupChannel();
channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers());
- channelListener = new ChannelListener(configurationContext, configurationManager,
- contextManager, controlCmdProcessor);
-
- // Set the maximum number of retries, if message sending to a particular node fails
- Parameter maxRetriesParam = getParameter("maxRetries");
- int maxRetries = 10;
- if (maxRetriesParam != null) {
- maxRetries = Integer.parseInt((String) maxRetriesParam.getValue());
- }
- ReplicationTransmitter replicationTransmitter =
- (ReplicationTransmitter) channel.getChannelSender();
- MultiPointSender multiPointSender = replicationTransmitter.getTransport();
- multiPointSender.setMaxRetryAttempts(maxRetries);
+ channelListener =
+ new ChannelListener(configurationContext, configurationManager, contextManager);
- // Set the domain for this Node
- Parameter domainParam = getParameter(ClusteringConstants.DOMAIN);
- byte[] domain;
- if (domainParam != null) {
- domain = ((String) domainParam.getValue()).getBytes();
- } else {
- domain = "apache.axis2.domain".getBytes();
- }
- channel.getMembershipService().getProperties().setProperty("mcastClusterDomain",
- new String(domain));
+ setMaximumRetries();
+ byte[] domain = getClusterDomain();
Parameter membershipSchemeParam = getParameter("membershipScheme");
String membershipScheme = ClusteringConstants.MembershipScheme.MULTICAST_BASED;
@@ -249,6 +190,88 @@
configurationContext.
setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
+ log.info("Cluster initialization completed.");
+ }
+
+ /**
+ * Get the clustering domain to which this node belongs to
+ * @return The clustering domain to which this node belongs to
+ */
+ private byte[] getClusterDomain() {
+ Parameter domainParam = getParameter(ClusteringConstants.DOMAIN);
+ byte[] domain;
+ if (domainParam != null) {
+ domain = ((String) domainParam.getValue()).getBytes();
+ } else {
+ domain = "apache.axis2.domain".getBytes();
+ }
+ return domain;
+ }
+
+ /**
+ * Set the maximum number of retries, if message sending to a particular node fails
+ */
+ private void setMaximumRetries() {
+ Parameter maxRetriesParam = getParameter("maxRetries");
+ int maxRetries = 10;
+ if (maxRetriesParam != null) {
+ maxRetries = Integer.parseInt((String) maxRetriesParam.getValue());
+ }
+ ReplicationTransmitter replicationTransmitter =
+ (ReplicationTransmitter) channel.getChannelSender();
+ MultiPointSender multiPointSender = replicationTransmitter.getTransport();
+ multiPointSender.setMaxRetryAttempts(maxRetries);
+ }
+
+ /**
+ * A RequestBlockingHandler, which is an implementation of
+ * {@link org.apache.axis2.engine.Handler} is added to the InFlow & InFaultFlow. This handler
+ * is used for rejecting Web service requests until this node has been initialized. This handler
+ * can also be used for rejecting requests when this node is reinitializing or is in an
+ * inconsistent state (which can happen when a configuration change is taking place).
+ */
+ private void addRequestBlockingHandlerToInFlows() {
+ AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
+ for (Object o : axisConfig.getInFlowPhases()) {
+ Phase phase = (Phase) o;
+ if (phase instanceof DispatchPhase) {
+ RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
+ if (!phase.getHandlers().contains(requestBlockingHandler)) {
+ PhaseRule rule = new PhaseRule("Dispatch");
+ rule.setAfter("SOAPMessageBodyBasedDispatcher");
+ rule.setBefore("InstanceDispatcher");
+ HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
+ handlerDesc.setHandler(requestBlockingHandler);
+ handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
+ handlerDesc.setRules(rule);
+ phase.addHandler(requestBlockingHandler);
+
+ log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
+ " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFlow");
+ break;
+ }
+ }
+ }
+ for (Object o : axisConfig.getInFaultFlowPhases()) {
+ Phase phase = (Phase) o;
+ if (phase instanceof DispatchPhase) {
+ RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
+ if (!phase.getHandlers().contains(requestBlockingHandler)) {
+ PhaseRule rule = new PhaseRule("Dispatch");
+ rule.setAfter("SOAPMessageBodyBasedDispatcher");
+ rule.setBefore("InstanceDispatcher");
+ HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
+ handlerDesc.setHandler(requestBlockingHandler);
+ handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
+ handlerDesc.setRules(rule);
+ phase.addHandler(requestBlockingHandler);
+
+ log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
+ " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFaultFlow");
+ break;
+ }
+ }
+ }
}
/**
@@ -320,8 +343,8 @@
}
} else if (membershipScheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
log.info("Using multicast based membership management scheme");
- configureMulticastParameters(channel);
- } else {
+ configureMulticastParameters(channel, domain);
+ } else {
String msg = "Invalid membership scheme '" + membershipScheme +
"'. Supported schemes are multicast & wka";
log.error(msg);
@@ -389,8 +412,10 @@
* parameters
*
* @param channel The Tribes channel
+ * @param domain The clustering domain to which this node belongs to
*/
- private void configureMulticastParameters(ManagedChannel channel) {
+ private void configureMulticastParameters(ManagedChannel channel,
+ byte[] domain) {
Properties mcastProps = channel.getMembershipService().getProperties();
Parameter mcastAddress = getParameter("multicastAddress");
if (mcastAddress != null) {
@@ -435,7 +460,7 @@
receiver.setPort(Integer.parseInt(port));
}
- /*mcastProps.setProperty("mcastClusterDomain", "catalina");*/
+ mcastProps.setProperty("mcastClusterDomain", new String(domain));
}
/**
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Sat May 17 07:00:32 2008
@@ -31,13 +31,14 @@
* implemented.
* </p>
* <p>
- * The initilization of a node in the cluster is handled here. It is also responsible for getting this
- * node to join the cluster. This node should not process any Web services requests until it
+ * The initilization of a node in the cluster is handled here. It is also responsible for getting
+ * this node to join the cluster. This node should not process any Web services requests until it
* successfully joins the cluster. Generally, this node will also need to obtain the state
* information and/or configuration information from a neighboring node.
* This interface is also responsible for
* properly instantiating a {@link ContextManager} & {@link ConfigurationManager}. In the case of
- * a static <a href="http://afkham.org/2008/05/group-membership-management-schemes.html">membership scheme</a>,
+ * a static <a href="http://afkham.org/2008/05/group-membership-management-schemes.html">
+ * membership scheme</a>,
* this members are read from the axis2.xml file and added to the ClusterManager.
* </p>
* <p>
@@ -52,7 +53,8 @@
* </p>
* <p>
* There can also be several "parameter" elements, which are children of the "cluster" element
- * in the axis2.xml file. Generally, these parameters will be specific to the ClusterManager implementation.
+ * in the axis2.xml file. Generally, these parameters will be specific to the ClusterManager
+ * implementation.
* </p>
*/
public interface ClusterManager extends ParameterInclude {
@@ -79,11 +81,11 @@
* We can have a cluster with no context replication, in which case the contextManager will be
* null. This value is set by the {@link org.apache.axis2.deployment.ClusterBuilder}, by
* reading the "contextManager" element in the axis2.xml
- *
+ * <p/>
* e.g.
* <code>
* <b>
- * <contextManager class="org.apache.axis2.cluster.configuration.TribesContextManager">
+ * <contextManager class="org.apache.axis2.cluster.configuration.TribesContextManager">
* </b>
* </code>
*
@@ -96,7 +98,7 @@
* We can have a cluster with no configuration management, in which case the configurationManager
* will be null. This value is set by the {@link org.apache.axis2.deployment.ClusterBuilder}, by
* reading the "configurationManager" element in the axis2.xml
- *
+ * <p/>
* e.g.
* <code>
* <b>
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java Sat May 17 07:00:32 2008
@@ -21,6 +21,9 @@
import org.apache.axis2.context.ConfigurationContext;
+/**
+ * This is the counterpart of {@link ConfigurationManager}
+ */
public interface ConfigurationManagerListener {
void serviceGroupsLoaded(ConfigurationClusteringCommand command);