You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/21 19:35:59 UTC

svn commit: r658777 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: control/wka/ tribes/

Author: azeez
Date: Wed May 21 10:35:58 2008
New Revision: 658777

URL: http://svn.apache.org/viewvc?rev=658777&view=rev
Log:
Initial stuff on WKA based membership discovery


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,32 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.context.ConfigurationContext;
+
+/**
+ *
+ */
+public class JoinGroupCommand extends ControlCommand {
+    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+        //TODO: Method implementation
+        System.out.println("### EXEC JoinGroupCommand");
+
+        //todo: send member list, add to static member list, send member joined to others
+    }
+}

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,24 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.catalina.tribes.io.ChannelData;
+
+/**
+ *
+ */
+public class LeaveGroupCommand extends ChannelData {
+}

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,24 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.catalina.tribes.io.ChannelData;
+
+/**
+ *
+ */
+public class MemberJoinedCommand extends ChannelData {
+}

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,30 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.context.ConfigurationContext;
+
+/**
+ *
+ */
+public class MemberListCommand extends ControlCommand {
+    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+        //TODO: Method implementation
+        System.out.println("#### MEMBER LIST CMD");
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java Wed May 21 10:35:58 2008
@@ -26,10 +26,12 @@
 import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.clustering.control.GetStateResponseCommand;
+import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
 import org.apache.axis2.context.ConfigurationContext;
 
 /**
- * 
+ *
  */
 public class ControlCommandProcessor {
     private ConfigurationContext configurationContext;
@@ -70,6 +72,10 @@
             getConfigRespCmd.
                     setServiceGroups(((GetConfigurationCommand) command).getServiceGroupNames());
             return getConfigRespCmd;
+        } else if (command instanceof JoinGroupCommand) {
+
+            command.execute(configurationContext);
+            return new MemberListCommand();
         }
         return null;
     }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java Wed May 21 10:35:58 2008
@@ -23,6 +23,7 @@
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.RpcCallback;
@@ -45,16 +46,27 @@
     }
 
     public Serializable replyRequest(Serializable msg, Member member) {
-        if (msg instanceof GetStateCommand || msg instanceof GetConfigurationCommand) {
+        if (msg instanceof GetStateCommand ||
+            msg instanceof GetConfigurationCommand) {
             try {
                 log.info("Received " + msg + " initialization request message from " +
                          TribesUtil.getHost(member));
-                return controlCommandProcessor.process((ControlCommand) msg);
+                return controlCommandProcessor.process((ControlCommand) msg); // response is either GetConfigurationResponseCommand or GetStateResponseCommand
             } catch (ClusteringFault e) {
                 String errMsg = "Cannot handle initialization request";
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }
+        } else if (msg instanceof JoinGroupCommand) {
+            log.info("Received " + msg + " from " + TribesUtil.getHost(member));
+//            JoinGroupCommand command = (JoinGroupCommand) msg;
+            try {
+                return controlCommandProcessor.process((ControlCommand) msg); // response is 
+            } catch (ClusteringFault e) {
+                String errMsg = "Cannot handle JOIN request";
+                log.error(errMsg, e);
+                throw new RemoteProcessException(errMsg, e);
+            }
         }
         return null;
     }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 21 10:35:58 2008
@@ -33,6 +33,7 @@
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.HandlerDescription;
 import org.apache.axis2.description.Parameter;
@@ -78,6 +79,7 @@
  */
 public class TribesClusterManager implements ClusterManager {
     public static final int MSG_ORDER_OPTION = 512;
+
     private static final Log log = LogFactory.getLog(TribesClusterManager.class);
 
     private DefaultConfigurationManager configurationManager;
@@ -169,6 +171,24 @@
         log.info("Local Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers(membershipManager);
 
+        // If a WKA scheme is used, JOIN the group and get the member list
+        if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
+            try {
+                Response[] responses = rpcChannel.send(membershipManager.getMembers(),
+                                                       new JoinGroupCommand(),
+                                                       RpcChannel.FIRST_REPLY,
+                                                       Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                       10000);
+                if (responses.length > 0) {
+                    ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+                }
+            } catch (ChannelException e) {
+                String msg = "Could not JOIN group";
+                log.error(e);
+                throw new ClusteringFault(msg, e);
+            }
+        }
+
         // If configuration management is enabled, get the latest config from a neighbour
         if (configurationManager != null) {
             configurationManager.setSender(channelSender);
@@ -362,7 +382,9 @@
             } catch (IOException e) {
                 String msg =
                         "Could not allocate a port in the range 4000-4100 for local host " +
-                        localMember.getHostname();
+                        localMember.getHostname() +
+                        ". Check whether the IP address specified or inferred for the local " +
+                        "member is correct.";
                 log.error(msg, e);
                 throw new ClusteringFault(msg, e);
             }
@@ -434,7 +456,7 @@
     protected int getLocalPort(ServerSocket socket, String hostname,
                                int portstart, int retries) throws IOException {
         InetSocketAddress addr = null;
-        while (retries > 0) {
+        if (retries > 0) {
             try {
                 addr = new InetSocketAddress(hostname, portstart);
                 socket.bind(addr);
@@ -459,10 +481,10 @@
                 } catch (InterruptedException ignored) {
                     ignored.printStackTrace();
                 }
-                retries = getLocalPort(socket, hostname, portstart, retries);
+                getLocalPort(socket, hostname, portstart, retries);
             }
         }
-        return retries;
+        return portstart;
     }
 
     /**
@@ -505,10 +527,6 @@
         orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
         channel.addInterceptor(orderInterceptor);
 
-        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
-        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
-        channel.addInterceptor(atMostOnceInterceptor);
-
         // Add a reliable failure detector
         TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
         tcpFailureDetector.setPrevious(dfi);
@@ -518,6 +536,10 @@
             staticMembershipInterceptor = new StaticMembershipInterceptor();
             channel.addInterceptor(staticMembershipInterceptor);
         }
+
+        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+        channel.addInterceptor(atMostOnceInterceptor);
     }
 
     /**

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java Wed May 21 10:35:58 2008
@@ -33,4 +33,11 @@
     public static final String BIND_ADDRESS = "bindAddress";
     public static final String TCP_LISTEN_PORT = "tcpListenPort";
     public static final String MAX_RETRIES = "maxRetries";
+
+    public final class MembershipMessages {
+        public static final int JOIN = 3;
+        public static final int LEAVE = 5;
+        public static final int NEW_MEMBER_JOINED = 7;
+        public static final int MEMBER_LIST = 11;
+    }
 }