You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/23 17:56:47 UTC

svn commit: r670664 - /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Author: azeez
Date: Mon Jun 23 08:56:46 2008
New Revision: 670664

URL: http://svn.apache.org/viewvc?rev=670664&view=rev
Log:
If the port specified in the axis2.xml file is unavailable, we need to automatically figure out a port, and this port needs to be set in the static member

Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=670664&r1=670663&r2=670664&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jun 23 08:56:46 2008
@@ -475,28 +475,30 @@
 
         Parameter localPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
         int port;
-        if (localPort != null) {
-            port = Integer.parseInt(((String) localPort.getValue()).trim());
-        } else { // In cases where the localport needs to be automatically figured out
-            try {
-                port = getLocalPort(new ServerSocket(), localMember.getHostname(), 4000, 100);
-            } catch (IOException e) {
-                String msg =
-                        "Could not allocate a port in the range 4000-4100 for local host " +
-                        localMember.getHostname() +
-                        ". Check whether the IP address specified or inferred for the local " +
-                        "member is correct.";
-                log.error(msg, e);
-                throw new ClusteringFault(msg, e);
+        try {
+            if (localPort != null) {
+                port = Integer.parseInt(((String) localPort.getValue()).trim());
+                port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
+            } else { // In cases where the localport needs to be automatically figured out
+                port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
             }
+        } catch (IOException e) {
+            String msg =
+                    "Could not allocate the specified port or a port in the range 4000-4100 " +
+                    "for local host " + localMember.getHostname() +
+                    ". Check whether the IP address specified or inferred for the local " +
+                    "member is correct.";
+            log.error(msg, e);
+            throw new ClusteringFault(msg, e);
         }
 
         byte[] payload = "ping".getBytes();
         localMember.setPayload(payload);
         receiver.setPort(port);
-        staticMembershipInterceptor.setLocalMember(localMember);
         localMember.setPort(port);
         localMember.setDomain(domain);
+        staticMembershipInterceptor.setLocalMember(localMember);
+
         // ------------ END: Configure and add the local member ---------------------
 
         // ------------ START: Add other members ---------------------
@@ -561,21 +563,18 @@
     }
 
     protected int getLocalPort(ServerSocket socket, String hostname,
-                               int portstart, int retries) throws IOException {
+                               int preferredPort, int portstart, int retries) throws IOException {
+        if (preferredPort != -1) {
+            try {
+                return getLocalPort(socket, hostname, preferredPort);
+            } catch (IOException ignored) {
+                // Fall through and try a default port
+            }
+        }
         InetSocketAddress addr = null;
         if (retries > 0) {
             try {
-                addr = new InetSocketAddress(hostname, portstart);
-                socket.bind(addr);
-                log.info("Receiver Server Socket bound to:" + addr);
-                socket.setSoTimeout(5);
-                socket.close();
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException ignored) {
-                    ignored.printStackTrace();
-                }
-                return portstart;
+                return getLocalPort(socket, hostname, portstart);
             } catch (IOException x) {
                 retries--;
                 if (retries <= 0) {
@@ -588,12 +587,27 @@
                 } catch (InterruptedException ignored) {
                     ignored.printStackTrace();
                 }
-                getLocalPort(socket, hostname, portstart, retries);
+                getLocalPort(socket, hostname, portstart, retries, -1);
             }
         }
         return portstart;
     }
 
+    private int getLocalPort(ServerSocket socket, String hostname, int port) throws IOException {
+        InetSocketAddress addr;
+        addr = new InetSocketAddress(hostname, port);
+        socket.bind(addr);
+        log.info("Receiver Server Socket bound to:" + addr);
+        socket.setSoTimeout(5);
+        socket.close();
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException ignored) {
+            ignored.printStackTrace();
+        }
+        return port;
+    }
+
     /**
      * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
      * membership management scheme
@@ -607,10 +621,16 @@
                                  byte[] domain,
                                  String membershipScheme) throws ClusteringFault {
 
+        if (log.isDebugEnabled()) {
+            log.debug("Adding Interceptors [STARTED]");
+        }
         if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
             TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
             tcpPingInterceptor.setInterval(100);
             channel.addInterceptor(tcpPingInterceptor);
+            if (log.isDebugEnabled()) {
+                log.debug("Added TCP Ping Interceptor");
+            }
         }
 
         // Add the NonBlockingCoordinator. This is used for leader election
@@ -630,6 +650,9 @@
 //        tcpFailureDetector.setReadTestTimeout(30000);
         tcpFailureDetector.setConnectTimeout(30000);
         channel.addInterceptor(tcpFailureDetector);
+        if (log.isDebugEnabled()) {
+            log.debug("Added TCP Failure Detector");
+        }
 
         // Add a DomainFilterInterceptor
         channel.getMembershipService().setDomain(domain);
@@ -637,25 +660,44 @@
             DomainFilterInterceptor dfi = new DomainFilterInterceptor();
             dfi.setDomain(domain);
             channel.addInterceptor(dfi);
+            if (log.isDebugEnabled()) {
+                log.debug("Added Domain Filter Interceptor");
+            }
         } else {
             LoadBalancerInterceptor lbInterceptor =
                     new LoadBalancerInterceptor(domain, lbEventHandlers);
             channel.addInterceptor(lbInterceptor);
+            if (log.isDebugEnabled()) {
+                log.debug("Added Load Balancer Interceptor");
+            }
         }
 
         // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
         AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
         atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
         channel.addInterceptor(atMostOnceInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added At-most-once Interceptor");
+        }
 
         // Add the OrderInterceptor to preserve sender ordering
         OrderInterceptor orderInterceptor = new OrderInterceptor();
         orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
         channel.addInterceptor(orderInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added Message Order Interceptor");
+        }
 
         if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
             staticMembershipInterceptor = new StaticMembershipInterceptor();
             channel.addInterceptor(staticMembershipInterceptor);
+            if (log.isDebugEnabled()) {
+                log.debug("Added Static Membership Interceptor");
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Adding Interceptors [COMPLETED]");
         }
     }