You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/05/17 16:00:32 UTC

svn commit: r657356 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clustering/ kernel/src/org/apache/axis2/clustering/configuration/

Author: azeez
Date: Sat May 17 07:00:32 2008
New Revision: 657356

URL: http://svn.apache.org/viewvc?rev=657356&view=rev
Log:
1. code refactoring
2. javadocs added


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Sat May 17 07:00:32 2008
@@ -47,17 +47,14 @@
 
     private DefaultContextManager contextManager;
     private DefaultConfigurationManager configurationManager;
-    private ControlCommandProcessor controlCommandProcessor;
 
     private ConfigurationContext configurationContext;
 
     public ChannelListener(ConfigurationContext configurationContext,
                            DefaultConfigurationManager configurationManager,
-                           DefaultContextManager contextManager,
-                           ControlCommandProcessor controlCommandProcessor) {
+                           DefaultContextManager contextManager) {
         this.configurationManager = configurationManager;
         this.contextManager = contextManager;
-        this.controlCommandProcessor = controlCommandProcessor;
         this.configurationContext = configurationContext;
     }
 
@@ -91,22 +88,22 @@
     public void messageReceived(Serializable msg, Member sender) {
         try {
             AxisConfiguration configuration = configurationContext.getAxisConfiguration();
-            List classLoaders = new ArrayList();
+            List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();
             classLoaders.add(configuration.getSystemClassLoader());
             classLoaders.add(getClass().getClassLoader());
             for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) {
                 AxisServiceGroup group = (AxisServiceGroup) iter.next();
                 classLoaders.add(group.getServiceGroupClassLoader());
             }
-            for (Iterator iter = configuration.getModules().values().iterator(); iter.hasNext();) {
-                AxisModule module = (AxisModule) iter.next();
+            for(Object obj: configuration.getModules().values()){    
+                AxisModule module = (AxisModule) obj;
                 classLoaders.add(module.getModuleClassLoader());
             }
             byte[] message = ((ByteMessage) msg).getMessage();
             msg = XByteBuffer.deserialize(message,
                                           0,
                                           message.length,
-                                          (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+                                          classLoaders.toArray(new ClassLoader[classLoaders.size()]));
         } catch (Exception e) {
             String errMsg = "Cannot deserialize received message";
             log.error(errMsg, e);
@@ -127,7 +124,7 @@
         }
 
         try {
-            processMessage(msg, sender);
+            processMessage(msg);
         } catch (Exception e) {
             String errMsg = "Cannot process received message";
             log.error(errMsg, e);
@@ -135,7 +132,7 @@
         }
     }
 
-    private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
+    private void processMessage(Serializable msg) throws ClusteringFault {
         if (msg instanceof ContextClusteringCommand && contextManager != null) {
             ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
             ctxCmd.execute(configurationContext);

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sat May 17 07:00:32 2008
@@ -118,75 +118,16 @@
      * @throws ClusteringFault If initialization fails
      */
     public void init() throws ClusteringFault {
-
-        AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
-        for (Object o : axisConfig.getInFlowPhases()) {
-            Phase phase = (Phase) o;
-            if (phase instanceof DispatchPhase) {
-                RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
-                if (!phase.getHandlers().contains(requestBlockingHandler)) {
-                    PhaseRule rule = new PhaseRule("Dispatch");
-                    rule.setAfter("SOAPMessageBodyBasedDispatcher");
-                    rule.setBefore("InstanceDispatcher");
-                    HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
-                    handlerDesc.setHandler(requestBlockingHandler);
-                    handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
-                    handlerDesc.setRules(rule);
-                    phase.addHandler(requestBlockingHandler);
-
-                    log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
-                             " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFlow");
-                    break;
-                }
-            }
-        }
-        for (Object o : axisConfig.getInFaultFlowPhases()) {
-            Phase phase = (Phase) o;
-            if (phase instanceof DispatchPhase) {
-                RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
-                if (!phase.getHandlers().contains(requestBlockingHandler)) {
-                    PhaseRule rule = new PhaseRule("Dispatch");
-                    rule.setAfter("SOAPMessageBodyBasedDispatcher");
-                    rule.setBefore("InstanceDispatcher");
-                    HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
-                    handlerDesc.setHandler(requestBlockingHandler);
-                    handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
-                    handlerDesc.setRules(rule);
-                    phase.addHandler(requestBlockingHandler);
-
-                    log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
-                             " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFaultFlow");
-                    break;
-                }
-            }
-        }
+        log.info("Initializing cluster...");
+        addRequestBlockingHandlerToInFlows();
         membershipManager = new MembershipManager();
         channel = new GroupChannel();
         channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers());
-        channelListener = new ChannelListener(configurationContext, configurationManager,
-                                              contextManager, controlCmdProcessor);
-
-        // Set the maximum number of retries, if message sending to a particular node fails
-        Parameter maxRetriesParam = getParameter("maxRetries");
-        int maxRetries = 10;
-        if (maxRetriesParam != null) {
-            maxRetries = Integer.parseInt((String) maxRetriesParam.getValue());
-        }
-        ReplicationTransmitter replicationTransmitter =
-                (ReplicationTransmitter) channel.getChannelSender();
-        MultiPointSender multiPointSender = replicationTransmitter.getTransport();
-        multiPointSender.setMaxRetryAttempts(maxRetries);
+        channelListener =
+                new ChannelListener(configurationContext, configurationManager, contextManager);
 
-        // Set the domain for this Node
-        Parameter domainParam = getParameter(ClusteringConstants.DOMAIN);
-        byte[] domain;
-        if (domainParam != null) {
-            domain = ((String) domainParam.getValue()).getBytes();
-        } else {
-            domain = "apache.axis2.domain".getBytes();
-        }
-        channel.getMembershipService().getProperties().setProperty("mcastClusterDomain",
-                                                                   new String(domain));
+        setMaximumRetries();
+        byte[] domain = getClusterDomain();
 
         Parameter membershipSchemeParam = getParameter("membershipScheme");
         String membershipScheme = ClusteringConstants.MembershipScheme.MULTICAST_BASED;
@@ -249,6 +190,88 @@
 
         configurationContext.
                 setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
+        log.info("Cluster initialization completed.");
+    }
+
+    /**
+     * Get the clustering domain to which this node belongs to
+     * @return The clustering domain to which this node belongs to
+     */
+    private byte[] getClusterDomain() {
+        Parameter domainParam = getParameter(ClusteringConstants.DOMAIN);
+        byte[] domain;
+        if (domainParam != null) {
+            domain = ((String) domainParam.getValue()).getBytes();
+        } else {
+            domain = "apache.axis2.domain".getBytes();
+        }
+        return domain;
+    }
+
+    /**
+     * Set the maximum number of retries, if message sending to a particular node fails
+     */
+    private void setMaximumRetries() {
+        Parameter maxRetriesParam = getParameter("maxRetries");
+        int maxRetries = 10;
+        if (maxRetriesParam != null) {
+            maxRetries = Integer.parseInt((String) maxRetriesParam.getValue());
+        }
+        ReplicationTransmitter replicationTransmitter =
+                (ReplicationTransmitter) channel.getChannelSender();
+        MultiPointSender multiPointSender = replicationTransmitter.getTransport();
+        multiPointSender.setMaxRetryAttempts(maxRetries);
+    }
+
+    /**
+     * A RequestBlockingHandler, which is an implementation of
+     * {@link org.apache.axis2.engine.Handler} is added to the InFlow & InFaultFlow. This handler
+     * is used for rejecting Web service requests until this node has been initialized. This handler
+     * can also be used for rejecting requests when this node is reinitializing or is in an
+     * inconsistent state (which can happen when a configuration change is taking place).
+     */
+    private void addRequestBlockingHandlerToInFlows() {
+        AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
+        for (Object o : axisConfig.getInFlowPhases()) {
+            Phase phase = (Phase) o;
+            if (phase instanceof DispatchPhase) {
+                RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
+                if (!phase.getHandlers().contains(requestBlockingHandler)) {
+                    PhaseRule rule = new PhaseRule("Dispatch");
+                    rule.setAfter("SOAPMessageBodyBasedDispatcher");
+                    rule.setBefore("InstanceDispatcher");
+                    HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
+                    handlerDesc.setHandler(requestBlockingHandler);
+                    handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
+                    handlerDesc.setRules(rule);
+                    phase.addHandler(requestBlockingHandler);
+
+                    log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
+                             " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFlow");
+                    break;
+                }
+            }
+        }
+        for (Object o : axisConfig.getInFaultFlowPhases()) {
+            Phase phase = (Phase) o;
+            if (phase instanceof DispatchPhase) {
+                RequestBlockingHandler requestBlockingHandler = new RequestBlockingHandler();
+                if (!phase.getHandlers().contains(requestBlockingHandler)) {
+                    PhaseRule rule = new PhaseRule("Dispatch");
+                    rule.setAfter("SOAPMessageBodyBasedDispatcher");
+                    rule.setBefore("InstanceDispatcher");
+                    HandlerDescription handlerDesc = requestBlockingHandler.getHandlerDesc();
+                    handlerDesc.setHandler(requestBlockingHandler);
+                    handlerDesc.setName(ClusteringConstants.REQUEST_BLOCKING_HANDLER);
+                    handlerDesc.setRules(rule);
+                    phase.addHandler(requestBlockingHandler);
+
+                    log.info("Added " + ClusteringConstants.REQUEST_BLOCKING_HANDLER +
+                             " between SOAPMessageBodyBasedDispatcher & InstanceDispatcher to InFaultFlow");
+                    break;
+                }
+            }
+        }
     }
 
     /**
@@ -320,8 +343,8 @@
             }
         } else if (membershipScheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
             log.info("Using multicast based membership management scheme");
-            configureMulticastParameters(channel);
-        } else {
+            configureMulticastParameters(channel, domain);
+        } else {                                        
             String msg = "Invalid membership scheme '" + membershipScheme +
                          "'. Supported schemes are multicast & wka";
             log.error(msg);
@@ -389,8 +412,10 @@
      * parameters
      *
      * @param channel The Tribes channel
+     * @param domain The clustering domain to which this node belongs to
      */
-    private void configureMulticastParameters(ManagedChannel channel) {
+    private void configureMulticastParameters(ManagedChannel channel,
+                                              byte[] domain) {
         Properties mcastProps = channel.getMembershipService().getProperties();
         Parameter mcastAddress = getParameter("multicastAddress");
         if (mcastAddress != null) {
@@ -435,7 +460,7 @@
             receiver.setPort(Integer.parseInt(port));
         }
 
-        /*mcastProps.setProperty("mcastClusterDomain", "catalina");*/
+        mcastProps.setProperty("mcastClusterDomain", new String(domain));
     }
 
     /**

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Sat May 17 07:00:32 2008
@@ -31,13 +31,14 @@
  * implemented.
  * </p>
  * <p>
- * The initilization of a node in the cluster is handled here. It is also responsible for getting this
- * node to join the cluster. This node should not process any Web services requests until it
+ * The initilization of a node in the cluster is handled here. It is also responsible for getting
+ * this node to join the cluster. This node should not process any Web services requests until it
  * successfully joins the cluster. Generally, this node will also need to obtain the state
  * information and/or configuration information from a neighboring node.
  * This interface is also responsible for
  * properly instantiating a {@link ContextManager} & {@link ConfigurationManager}. In the case of
- * a static <a href="http://afkham.org/2008/05/group-membership-management-schemes.html">membership scheme</a>,
+ * a static <a href="http://afkham.org/2008/05/group-membership-management-schemes.html">
+ * membership scheme</a>,
  * this members are read from the axis2.xml file and added to the ClusterManager.
  * </p>
  * <p>
@@ -52,7 +53,8 @@
  * </p>
  * <p>
  * There can also be several "parameter" elements, which are children of the "cluster" element
- * in the axis2.xml file. Generally, these parameters will be specific to the ClusterManager implementation.
+ * in the axis2.xml file. Generally, these parameters will be specific to the ClusterManager
+ * implementation.
  * </p>
  */
 public interface ClusterManager extends ParameterInclude {
@@ -79,11 +81,11 @@
      * We can have a cluster with no context replication, in which case the contextManager will be
      * null. This value is set by the {@link org.apache.axis2.deployment.ClusterBuilder}, by
      * reading the  "contextManager" element in the axis2.xml
-     *
+     * <p/>
      * e.g.
      * <code>
      * <b>
-     * <contextManager class="org.apache.axis2.cluster.configuration.TribesContextManager"> 
+     * <contextManager class="org.apache.axis2.cluster.configuration.TribesContextManager">
      * </b>
      * </code>
      *
@@ -96,7 +98,7 @@
      * We can have a cluster with no configuration management, in which case the configurationManager
      * will be null. This value is set by the {@link org.apache.axis2.deployment.ClusterBuilder}, by
      * reading the  "configurationManager" element in the axis2.xml
-     *
+     * <p/>
      * e.g.
      * <code>
      * <b>

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java?rev=657356&r1=657355&r2=657356&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/configuration/ConfigurationManagerListener.java Sat May 17 07:00:32 2008
@@ -21,6 +21,9 @@
 
 import org.apache.axis2.context.ConfigurationContext;
 
+/**
+ * This is the counterpart of {@link ConfigurationManager}
+ */
 public interface ConfigurationManagerListener {
 
     void serviceGroupsLoaded(ConfigurationClusteringCommand command);