You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/23 17:56:47 UTC
svn commit: r670664 -
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Author: azeez
Date: Mon Jun 23 08:56:46 2008
New Revision: 670664
URL: http://svn.apache.org/viewvc?rev=670664&view=rev
Log:
If the port specified in the axis2.xml file is unavailable, we need to automatically figure out a port, and this port needs to be set in the static member
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=670664&r1=670663&r2=670664&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jun 23 08:56:46 2008
@@ -475,28 +475,30 @@
Parameter localPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
int port;
- if (localPort != null) {
- port = Integer.parseInt(((String) localPort.getValue()).trim());
- } else { // In cases where the localport needs to be automatically figured out
- try {
- port = getLocalPort(new ServerSocket(), localMember.getHostname(), 4000, 100);
- } catch (IOException e) {
- String msg =
- "Could not allocate a port in the range 4000-4100 for local host " +
- localMember.getHostname() +
- ". Check whether the IP address specified or inferred for the local " +
- "member is correct.";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
+ try {
+ if (localPort != null) {
+ port = Integer.parseInt(((String) localPort.getValue()).trim());
+ port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
+ } else { // In cases where the localport needs to be automatically figured out
+ port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
}
+ } catch (IOException e) {
+ String msg =
+ "Could not allocate the specified port or a port in the range 4000-4100 " +
+ "for local host " + localMember.getHostname() +
+ ". Check whether the IP address specified or inferred for the local " +
+ "member is correct.";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
}
byte[] payload = "ping".getBytes();
localMember.setPayload(payload);
receiver.setPort(port);
- staticMembershipInterceptor.setLocalMember(localMember);
localMember.setPort(port);
localMember.setDomain(domain);
+ staticMembershipInterceptor.setLocalMember(localMember);
+
// ------------ END: Configure and add the local member ---------------------
// ------------ START: Add other members ---------------------
@@ -561,21 +563,18 @@
}
protected int getLocalPort(ServerSocket socket, String hostname,
- int portstart, int retries) throws IOException {
+ int preferredPort, int portstart, int retries) throws IOException {
+ if (preferredPort != -1) {
+ try {
+ return getLocalPort(socket, hostname, preferredPort);
+ } catch (IOException ignored) {
+ // Fall through and try a default port
+ }
+ }
InetSocketAddress addr = null;
if (retries > 0) {
try {
- addr = new InetSocketAddress(hostname, portstart);
- socket.bind(addr);
- log.info("Receiver Server Socket bound to:" + addr);
- socket.setSoTimeout(5);
- socket.close();
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- ignored.printStackTrace();
- }
- return portstart;
+ return getLocalPort(socket, hostname, portstart);
} catch (IOException x) {
retries--;
if (retries <= 0) {
@@ -588,12 +587,27 @@
} catch (InterruptedException ignored) {
ignored.printStackTrace();
}
- getLocalPort(socket, hostname, portstart, retries);
+ getLocalPort(socket, hostname, portstart, retries, -1);
}
}
return portstart;
}
+ private int getLocalPort(ServerSocket socket, String hostname, int port) throws IOException {
+ InetSocketAddress addr;
+ addr = new InetSocketAddress(hostname, port);
+ socket.bind(addr);
+ log.info("Receiver Server Socket bound to:" + addr);
+ socket.setSoTimeout(5);
+ socket.close();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ ignored.printStackTrace();
+ }
+ return port;
+ }
+
/**
* Add ChannelInterceptors. The order of the interceptors that are added will depend on the
* membership management scheme
@@ -607,10 +621,16 @@
byte[] domain,
String membershipScheme) throws ClusteringFault {
+ if (log.isDebugEnabled()) {
+ log.debug("Adding Interceptors [STARTED]");
+ }
if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
tcpPingInterceptor.setInterval(100);
channel.addInterceptor(tcpPingInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added TCP Ping Interceptor");
+ }
}
// Add the NonBlockingCoordinator. This is used for leader election
@@ -630,6 +650,9 @@
// tcpFailureDetector.setReadTestTimeout(30000);
tcpFailureDetector.setConnectTimeout(30000);
channel.addInterceptor(tcpFailureDetector);
+ if (log.isDebugEnabled()) {
+ log.debug("Added TCP Failure Detector");
+ }
// Add a DomainFilterInterceptor
channel.getMembershipService().setDomain(domain);
@@ -637,25 +660,44 @@
DomainFilterInterceptor dfi = new DomainFilterInterceptor();
dfi.setDomain(domain);
channel.addInterceptor(dfi);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Domain Filter Interceptor");
+ }
} else {
LoadBalancerInterceptor lbInterceptor =
new LoadBalancerInterceptor(domain, lbEventHandlers);
channel.addInterceptor(lbInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Load Balancer Interceptor");
+ }
}
// Add a AtMostOnceInterceptor to support at-most-once message processing semantics
AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
channel.addInterceptor(atMostOnceInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added At-most-once Interceptor");
+ }
// Add the OrderInterceptor to preserve sender ordering
OrderInterceptor orderInterceptor = new OrderInterceptor();
orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
channel.addInterceptor(orderInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Message Order Interceptor");
+ }
if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
staticMembershipInterceptor = new StaticMembershipInterceptor();
channel.addInterceptor(staticMembershipInterceptor);
+ if (log.isDebugEnabled()) {
+ log.debug("Added Static Membership Interceptor");
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Adding Interceptors [COMPLETED]");
}
}