You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/15 10:48:06 UTC

svn commit: r667933 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/ clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clustering/

Author: azeez
Date: Sun Jun 15 01:48:05 2008
New Revision: 667933

URL: http://svn.apache.org/viewvc?rev=667933&view=rev
Log:
Improvements to load balancing implementation. Now it works nicely when combined with the Synapse dynamic load balancing implementation.


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/LoadBalanceEventHandler.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java?rev=667933&r1=667932&r2=667933&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java Sun Jun 15 01:48:05 2008
@@ -18,24 +18,88 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.List;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
 import java.util.ArrayList;
+import java.util.List;
 
 /**
  * The default, dummy implementation of {@link LoadBalanceEventHandler}
  */
-public class DefaultLoadBalanceEventHandler implements LoadBalanceEventHandler{
+public class DefaultLoadBalanceEventHandler implements LoadBalanceEventHandler {
 
-    private static final Log log  = LogFactory.getLog(DefaultLoadBalanceEventHandler.class);
+    private static final Log log = LogFactory.getLog(DefaultLoadBalanceEventHandler.class);
     private List<Member> members = new ArrayList<Member>();
 
     public void applicationMemberAdded(Member member) {
-        log.info("Application member " + member + " joined cluster.");
-        members.add(member);
+        Thread th = new Thread(new MemberAdder(member));
+        th.setPriority(Thread.MAX_PRIORITY);
+        th.start();
     }
 
     public void applicationMemberRemoved(Member member) {
         log.info("Application member " + member + " left cluster.");
         members.remove(member);
     }
+
+    public List<Member> getMembers() {
+        return members;
+    }
+
+    private class MemberAdder implements Runnable {
+
+        private final Member member;
+
+        private MemberAdder(Member member) {
+            this.member = member;
+        }
+
+        public void run() {
+            if (!members.contains(member) && canConnect(member)) {
+                //                try
+                //                    Thread.sleep(10000);   // Sleep for sometime to allow complete initialization of the node
+                //                } catch (InterruptedException e) {
+                //                    e.printStackTrace();
+                //                }
+                members.add(member);
+                log.info("Application member " + member + " joined application cluster");
+            } else {
+                log.error("Could not add application member " + member);
+            }
+        }
+
+        /**
+         * Before adding a member, we will try to verify whether we can connect to it
+         *
+         * @param member The member whose connectvity needs to be verified
+         * @return true, if the member can be contacted; false, otherwise.
+         */
+        private boolean canConnect(Member member) {
+            for (int retries = 30; retries > 0; retries--) {
+                try {
+                    InetAddress addr = InetAddress.getByName(member.getHostName());
+                    SocketAddress httpSockaddr = new InetSocketAddress(addr,
+                                                                       member.getHttpPort());
+                    new Socket().connect(httpSockaddr, 10000);
+                    SocketAddress httpsSockaddr = new InetSocketAddress(addr,
+                                                                        member.getHttpsPort());
+                    new Socket().connect(httpsSockaddr, 10000);
+                    return true;
+                } catch (IOException e) {
+                    String msg = e.getMessage();
+                    if (msg.indexOf("Connection refused") == -1 && msg.indexOf("connect timed out") == -1) {
+                        log.error("Cannot connect to member " + member, e);
+                    }
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException ignored) {
+                    }
+                }
+            }
+            return false;
+        }
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java?rev=667933&r1=667932&r2=667933&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java Sun Jun 15 01:48:05 2008
@@ -52,15 +52,15 @@
      * Represents the group in which the applications being load balanced, are deployed and their
      * respective load balancer event handlers
      */
-    private Map<byte[], LoadBalanceEventHandler> lbEventHandlers;
+    private Map<String, LoadBalanceEventHandler> lbEventHandlers;
 
     public LoadBalancerInterceptor(byte[] loadBalancerDomain,
-                                   Map<byte[], LoadBalanceEventHandler> lbEventHandlers) {
+                                   Map<String, LoadBalanceEventHandler> lbEventHandlers) {
         this.loadBalancerDomain = loadBalancerDomain;
         this.lbEventHandlers = lbEventHandlers;
     }
 
-    public void setLbEventHandlers(Map<byte[], LoadBalanceEventHandler> lbEventHandlers) {
+    public void setLbEventHandlers(Map<String, LoadBalanceEventHandler> lbEventHandlers) {
         this.lbEventHandlers = lbEventHandlers;
     }
 
@@ -89,10 +89,10 @@
         }
 
         // Is this an application domain member?
-        for (byte[] applicationDomain : lbEventHandlers.keySet()) {
-            if (Arrays.equals(applicationDomain, member.getDomain())) {
+        for (String applicationDomain : lbEventHandlers.keySet()) {
+            if (Arrays.equals(applicationDomain.getBytes(), member.getDomain())) {
                 log.info("Application member " + TribesUtil.getName(member) + " joined group " +
-                         new String(applicationDomain));
+                         applicationDomain);
                 LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
                 if (eventHandler != null) {
                     eventHandler.applicationMemberAdded(toAxis2Member(member));
@@ -140,10 +140,10 @@
         }
 
         // Is this an application domain member?
-        for (byte[] applicationDomain : lbEventHandlers.keySet()) {
-            if (Arrays.equals(applicationDomain, member.getDomain())) {
+        for (String applicationDomain : lbEventHandlers.keySet()) {
+            if (Arrays.equals(applicationDomain.getBytes(), member.getDomain())) {
                 log.info("Application member " + TribesUtil.getName(member) + " left group " +
-                         new String(applicationDomain));
+                         applicationDomain);
                 LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
                 if (eventHandler != null) {
                     eventHandler.applicationMemberRemoved(toAxis2Member(member));

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=667933&r1=667932&r2=667933&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sun Jun 15 01:48:05 2008
@@ -77,8 +77,8 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Properties;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * The main ClusterManager class for the Tribes based clustering implementation
@@ -101,9 +101,8 @@
     private StaticMembershipInterceptor staticMembershipInterceptor;
     private List<org.apache.axis2.clustering.Member> members;
 
-    private Map<byte[], LoadBalanceEventHandler> lbEventHandlers =
-            new HashMap<byte[], LoadBalanceEventHandler>();
-    private LoadBalancerInterceptor lbInterceptor;
+    private final Map<String, LoadBalanceEventHandler> lbEventHandlers =
+            new HashMap<String, LoadBalanceEventHandler>();
     private boolean loadBalanceMode;
 
     public TribesClusterManager() {
@@ -122,10 +121,14 @@
                                            String applicationDomain) {
         log.info("Load balancing for application domain " + applicationDomain +
                  " using event handler " + eventHandler.getClass());
-        lbEventHandlers.put(applicationDomain.getBytes(), eventHandler);
+        lbEventHandlers.put(applicationDomain, eventHandler);
         loadBalanceMode = true;
     }
 
+    public LoadBalanceEventHandler getLoadBalanceEventHandler(String applicationDomain) {
+        return lbEventHandlers.get(applicationDomain);
+    }
+
     public ContextManager getContextManager() {
         return contextManager;
     }
@@ -595,9 +598,9 @@
      * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
      * membership management scheme
      *
-     * @param channel           The Tribes channel
-     * @param domain            The loadBalancerDomain to which this node belongs to
-     * @param membershipScheme  The membership scheme. Only wka & multicast are valid values.
+     * @param channel          The Tribes channel
+     * @param domain           The loadBalancerDomain to which this node belongs to
+     * @param membershipScheme The membership scheme. Only wka & multicast are valid values.
      * @throws ClusteringFault If an error occurs while adding interceptors
      */
     private void addInterceptors(ManagedChannel channel,
@@ -635,7 +638,7 @@
             dfi.setDomain(domain);
             channel.addInterceptor(dfi);
         } else {
-            lbInterceptor =
+            LoadBalancerInterceptor lbInterceptor =
                     new LoadBalancerInterceptor(domain, lbEventHandlers);
             channel.addInterceptor(lbInterceptor);
         }
@@ -774,7 +777,8 @@
                             }
                         }
                         // TODO: If we do not get a response within some time, try to recover from this fault
-                    } while (responses.length == 0 || responses[0] == null || responses[0].getMessage() == null);
+                    }
+                    while (responses.length == 0 || responses[0] == null || responses[0].getMessage() == null);
                     ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
                     break;
                 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=667933&r1=667932&r2=667933&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Sun Jun 15 01:48:05 2008
@@ -159,5 +159,13 @@
      */
     void addLoadBalanceEventHandler(LoadBalanceEventHandler eventHandler, String applicationDomain);
 
-
+    /**
+     * Get the LoadBalanceEventHandler which corresponds to the <code>applicationDomain</code>
+     * This will be valid only when this node is running in loadBalance mode
+     *
+     * @param applicationDomain The application domain to which the application nodes being
+     *                          load balanced belong to
+     * @return LoadBalanceEventHandler which corresponds to the <code>applicationDomain</code>
+     */
+    LoadBalanceEventHandler getLoadBalanceEventHandler(String applicationDomain);
 }
\ No newline at end of file

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/LoadBalanceEventHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/LoadBalanceEventHandler.java?rev=667933&r1=667932&r2=667933&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/LoadBalanceEventHandler.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/LoadBalanceEventHandler.java Sun Jun 15 01:48:05 2008
@@ -15,6 +15,8 @@
  */
 package org.apache.axis2.clustering;
 
+import java.util.List;
+
 /**
  * This is the interface through which the load balancing event are notified.
  * This will only be used when this member is running in loadBalance mode. In order to do this,
@@ -36,5 +38,11 @@
      * @param member Represents the member who left
      */
     void applicationMemberRemoved(Member member);
+
+    /** Get the list of current members
+     *
+     * @return List of current members
+     */
+    List<Member> getMembers();
     
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java?rev=667933&r1=667932&r2=667933&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java Sun Jun 15 01:48:05 2008
@@ -91,6 +91,7 @@
     }
 
     public String toString() {
-        return hostName + ":" + port;
+        return "Host:" + hostName + ", Port: " + port +
+               ", HTTP:" + httpPort + ", HTTPS:" + httpsPort;
     }
 }