You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/25 18:42:20 UTC

svn commit: r671600 [2/2] - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: ./ control/wka/ tribes/

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,503 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.MembershipScheme;
+import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.util.Utils;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
+import org.apache.catalina.tribes.membership.StaticMember;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of the WKA(well-known address) based membership scheme. In this scheme,
+ * membership is discovered using a few well-known members (who run at well-known IP addresses)
+ */
+public class WkaBasedMembershipScheme implements MembershipScheme {
+
+    private static final Log log = LogFactory.getLog(WkaBasedMembershipScheme.class);
+
+    /**
+     * The Tribes channel
+     */
+    private ManagedChannel channel;
+    private RpcChannel rpcChannel;
+    private MembershipManager primaryMembershipManager;
+    private List<MembershipManager> applicationDomainMembershipManagers;
+    private StaticMembershipInterceptor staticMembershipInterceptor;
+    private Map<String, Parameter> parameters;
+
+    /**
+     * The loadBalancerDomain to which the members belong to
+     */
+    private byte[] domain;
+
+    /**
+     * The static(well-known) members
+     */
+    private List<Member> members;
+
+    /**
+     * The mode in which this member operates such as "loadBalance" or "application"
+     */
+    private Mode mode;
+
+    public WkaBasedMembershipScheme(ManagedChannel channel,
+                                    Mode mode,
+                                    List<MembershipManager> applicationDomainMembershipManagers,
+                                    RpcChannel rpcChannel,
+                                    MembershipManager primaryMembershipManager,
+                                    Map<String, Parameter> parameters,
+                                    byte[] domain,
+                                    List<Member> members) {
+        this.channel = channel;
+        this.mode = mode;
+        this.applicationDomainMembershipManagers = applicationDomainMembershipManagers;
+        this.rpcChannel = rpcChannel;
+        this.primaryMembershipManager = primaryMembershipManager;
+        this.parameters = parameters;
+        this.domain = domain;
+        this.members = members;
+    }
+
+    /**
+     * Configure the membership related to the WKA based scheme
+     *
+     * @throws org.apache.axis2.clustering.ClusteringFault
+     *          If an error occurs while configuring this scheme
+     */
+    public void init() throws ClusteringFault {
+        addInterceptors();
+        configureStaticMembership();
+    }
+
+    private void configureStaticMembership() throws ClusteringFault {
+        channel.setMembershipService(new WkaMembershipService(primaryMembershipManager));
+        StaticMember localMember = new StaticMember();
+        primaryMembershipManager.setLocalMember(localMember);
+        ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
+
+        // ------------ START: Configure and add the local member ---------------------
+        Parameter localHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
+        String host;
+        if (localHost != null) {
+            host = ((String) localHost.getValue()).trim();
+        } else { // In cases where the localhost needs to be automatically figured out
+            try {
+                try {
+                    host = Utils.getIpAddress();
+                } catch (SocketException e) {
+                    String msg = "Could not get local IP address";
+                    log.error(msg, e);
+                    throw new ClusteringFault(msg, e);
+                }
+            } catch (Exception e) {
+                String msg = "Could not get the localhost name";
+                log.error(msg, e);
+                throw new ClusteringFault(msg, e);
+            }
+        }
+        receiver.setAddress(host);
+        try {
+            localMember.setHostname(host);
+        } catch (IOException e) {
+            String msg = "Could not set the local member's name";
+            log.error(msg, e);
+            throw new ClusteringFault(msg, e);
+        }
+
+        Parameter localPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
+        int port;
+        try {
+            if (localPort != null) {
+                port = Integer.parseInt(((String) localPort.getValue()).trim());
+                port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
+            } else { // In cases where the localport needs to be automatically figured out
+                port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
+            }
+        } catch (IOException e) {
+            String msg =
+                    "Could not allocate the specified port or a port in the range 4000-4100 " +
+                    "for local host " + localMember.getHostname() +
+                    ". Check whether the IP address specified or inferred for the local " +
+                    "member is correct.";
+            log.error(msg, e);
+            throw new ClusteringFault(msg, e);
+        }
+
+        byte[] payload = "ping".getBytes();
+        localMember.setPayload(payload);
+        receiver.setPort(port);
+        localMember.setPort(port);
+        localMember.setDomain(domain);
+        staticMembershipInterceptor.setLocalMember(localMember);
+
+        // ------------ END: Configure and add the local member ---------------------
+
+        // ------------ START: Add other members ---------------------
+        for (Member member : members) {
+            StaticMember tribesMember;
+            try {
+                tribesMember = new StaticMember(member.getHostName(), member.getPort(),
+                                                0, payload);
+            } catch (IOException e) {
+                String msg = "Could not add static member " +
+                             member.getHostName() + ":" + member.getPort();
+                log.error(msg, e);
+                throw new ClusteringFault(msg, e);
+            }
+
+            // Do not add the local member to the list of members
+            if (!(Arrays.equals(localMember.getHost(), tribesMember.getHost()) &&
+                  localMember.getPort() == tribesMember.getPort())) {
+                tribesMember.setDomain(domain);
+
+                // We will add the member even if it is offline at this moment. When the
+                // member comes online, it will be detected by the GMS
+                staticMembershipInterceptor.addStaticMember(tribesMember);
+                primaryMembershipManager.addWellKnownMember(tribesMember);
+                if (canConnect(member)) {
+                    primaryMembershipManager.memberAdded(tribesMember);
+                    log.info("Added static member " + TribesUtil.getName(tribesMember));
+                } else {
+                    log.info("Could not connect to member " + TribesUtil.getName(tribesMember));
+                }
+            }
+        }
+    }
+
+    /**
+     * Before adding a static member, we will try to verify whether we can connect to it
+     *
+     * @param member The member whose connectvity needs to be verified
+     * @return true, if the member can be contacted; false, otherwise.
+     */
+    private boolean canConnect(org.apache.axis2.clustering.Member member) {
+        for (int retries = 5; retries > 0; retries--) {
+            try {
+                InetAddress addr = InetAddress.getByName(member.getHostName());
+                SocketAddress sockaddr = new InetSocketAddress(addr,
+                                                               member.getPort());
+                new Socket().connect(sockaddr, 500);
+                return true;
+            } catch (IOException e) {
+                String msg = e.getMessage();
+                if (msg.indexOf("Connection refused") == -1 && msg.indexOf("connect timed out") == -1) {
+                    log.error("Cannot connect to member " +
+                              member.getHostName() + ":" + member.getPort(), e);
+                }
+            }
+        }
+        return false;
+    }
+
+    protected int getLocalPort(ServerSocket socket, String hostname,
+                               int preferredPort, int portstart, int retries) throws IOException {
+        if (preferredPort != -1) {
+            try {
+                return getLocalPort(socket, hostname, preferredPort);
+            } catch (IOException ignored) {
+                // Fall through and try a default port
+            }
+        }
+        InetSocketAddress addr = null;
+        if (retries > 0) {
+            try {
+                return getLocalPort(socket, hostname, portstart);
+            } catch (IOException x) {
+                retries--;
+                if (retries <= 0) {
+                    log.error("Unable to bind server socket to:" + addr + " throwing error.");
+                    throw x;
+                }
+                portstart++;
+                try {
+                    Thread.sleep(50);
+                } catch (InterruptedException ignored) {
+                    ignored.printStackTrace();
+                }
+                getLocalPort(socket, hostname, portstart, retries, -1);
+            }
+        }
+        return portstart;
+    }
+
+    private int getLocalPort(ServerSocket socket, String hostname, int port) throws IOException {
+        InetSocketAddress addr;
+        addr = new InetSocketAddress(hostname, port);
+        socket.bind(addr);
+        log.info("Receiver Server Socket bound to:" + addr);
+        socket.setSoTimeout(5);
+        socket.close();
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException ignored) {
+            ignored.printStackTrace();
+        }
+        return port;
+    }
+
+    /**
+     * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
+     * membership management scheme
+     */
+    private void addInterceptors() {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Adding Interceptors...");
+        }
+        TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
+        tcpPingInterceptor.setInterval(100);
+        channel.addInterceptor(tcpPingInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added TCP Ping Interceptor");
+        }
+
+        // Add the NonBlockingCoordinator. This is used for leader election
+        /*nbc = new NonBlockingCoordinator() {
+            public void fireInterceptorEvent(InterceptorEvent event) {
+                String status = event.getEventTypeDesc();
+                System.err.println("$$$$$$$$$$$$ NBC status=" + status);
+                int type = event.getEventType();
+            }
+        };
+        nbc.setPrevious(dfi);
+        channel.addInterceptor(nbc);*/
+
+        // Add a reliable failure detector
+        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+//        tcpFailureDetector.setPrevious(dfi); //TODO: check this
+//        tcpFailureDetector.setReadTestTimeout(30000);
+        tcpFailureDetector.setConnectTimeout(30000);
+        channel.addInterceptor(tcpFailureDetector);
+        if (log.isDebugEnabled()) {
+            log.debug("Added TCP Failure Detector");
+        }
+
+        staticMembershipInterceptor = new StaticMembershipInterceptor();
+        staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
+        primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+        channel.addInterceptor(staticMembershipInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added Static Membership Interceptor");
+        }
+
+        channel.getMembershipService().setDomain(domain);
+        mode.addInterceptors(channel);
+
+        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+        atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
+        channel.addInterceptor(atMostOnceInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added At-most-once Interceptor");
+        }
+
+        // Add the OrderInterceptor to preserve sender ordering
+        OrderInterceptor orderInterceptor = new OrderInterceptor();
+        orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
+        channel.addInterceptor(orderInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added Message Order Interceptor");
+        }
+    }
+
+    /**
+     * JOIN the group and get the member list
+     *
+     * @throws ClusteringFault If an error occurs while joining the group
+     */
+    public void joinGroup() throws ClusteringFault {
+
+        // Have multiple RPC channels with multiple RPC request handlers for each domain
+        // This is needed only when this member is running as a load balancer
+        for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
+
+            // Create an RpcChannel for each domain
+            String domain = new String(appDomainMembershipManager.getDomain());
+            new RpcChannel(domain.getBytes(),
+                           channel,
+                           new RpcRequestHandler(appDomainMembershipManager));
+            if(log.isDebugEnabled()){
+                log.debug("Created RPC Channel for application domain " + domain);
+            }
+        }
+
+        // Send JOIN message to a WKA member
+        if (primaryMembershipManager.getMembers().length > 0) {
+            log.info("Sending JOIN message to WKA members...");
+            org.apache.catalina.tribes.Member[] wkaMembers = primaryMembershipManager.getMembers(); // The well-known members
+            try {
+                Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
+            } catch (InterruptedException ignored) {
+            }
+            Response[] responses = null;
+            do {
+                try {
+                    responses = rpcChannel.send(wkaMembers,
+                                                new JoinGroupCommand(),
+                                                RpcChannel.ALL_REPLY,
+                                                Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                                TribesConstants.MEMBERSHIP_MSG_OPTION,
+                                                10000);
+                    if (responses.length == 0) {
+                        try {
+                            Thread.sleep(500);
+                        } catch (InterruptedException ignored) {
+                        }
+                    }
+                } catch (Exception e) {
+                    String msg = "Error occurred while trying to send JOIN request to WKA members";
+                    log.error(msg, e);
+                    wkaMembers = primaryMembershipManager.getMembers();
+                    if (wkaMembers.length == 0) {
+                        log.warn("There are no well-known members");
+                        break;
+                    }
+                }
+
+                // TODO: If we do not get a response within some time, try to recover from this fault
+            }
+            while (responses == null || responses.length == 0);  // Wait until we've received at least one response
+
+            //TODO: ######## If this node is a LB, it needs to get the entire domain to member-list map
+
+            for (Response response : responses) {
+                MemberListCommand command = (MemberListCommand) response.getMessage();
+                command.setMembershipManager(primaryMembershipManager);
+                command.execute(null); // Set the list of current members
+
+                // If the WKA member is not part of this group, remove it
+                if (!Arrays.equals(response.getSource().getDomain(),
+                                   primaryMembershipManager.getLocalMember().getDomain())) {
+                    primaryMembershipManager.memberDisappeared(response.getSource());
+                    if(log.isDebugEnabled()){
+                        log.debug("Removed member " + TribesUtil.getName(response.getSource()) + 
+                                  " since it does not belong to the local domain " +
+                                  new String(primaryMembershipManager.getLocalMember().getDomain()));
+                    }
+                }
+            }
+
+            // Send MEMBER_JOINE to the group
+            if (primaryMembershipManager.getMembers().length > 0) {
+                log.info("Sending MEMBER_JOINED to group...");
+                MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
+                memberJoinedCommand.setMember(primaryMembershipManager.getLocalMember());
+                try {
+                    rpcChannel.send(primaryMembershipManager.getMembers(),
+                                    memberJoinedCommand,
+                                    RpcChannel.ALL_REPLY,
+                                    Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                    TribesConstants.MEMBERSHIP_MSG_OPTION,
+                                    10000);
+                } catch (ChannelException e) {
+                    String msg = "Could not send MEMBER_JOINED message to group";
+                    log.error(msg, e);
+                    throw new ClusteringFault(msg, e);
+                }
+            }
+        }
+    }
+
+    public Parameter getParameter(String name) {
+        return parameters.get(name);
+    }
+
+    private class RpcRequestHandler implements RpcCallback {
+
+        private MembershipManager membershipManager;  //TODO: ############# Will need to inform about membership when a WKA member who is a LB joins
+
+        private RpcRequestHandler(MembershipManager membershipManager) {
+            this.membershipManager = membershipManager;
+            membershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+        }
+
+        public Serializable replyRequest(Serializable msg, org.apache.catalina.tribes.Member sender) {
+            String domain = new String(sender.getDomain());
+            if(log.isDebugEnabled()){
+                log.debug("Request received by RpcRequestHandler for domain " + domain);
+            }
+            if (msg instanceof JoinGroupCommand) {
+                log.info("Received JOIN message from application member " +
+                         TribesUtil.getName(sender) + " in domain " + domain);
+                // Return the list of current members to the caller
+                MemberListCommand memListCmd = new MemberListCommand();
+                memListCmd.setMembers(membershipManager.getMembers());
+
+                membershipManager.memberAdded(sender);
+                return memListCmd;
+            } else if (msg instanceof MemberJoinedCommand) {
+                log.info("Received MEMBER_JOINED message from application member " +
+                         TribesUtil.getName(sender) + " in domain " + domain);
+                try {
+                    MemberJoinedCommand command = (MemberJoinedCommand) msg;
+                    command.setMembershipManager(membershipManager);
+                    command.execute(null);
+                } catch (ClusteringFault e) {
+                    String errMsg = "Cannot handle MEMBER_JOINED notification";
+                    log.error(errMsg, e);
+                    throw new RemoteProcessException(errMsg, e);
+                }
+            } else if (msg instanceof MemberListCommand) {
+                try {                    //TODO: What if we receive more than one member list message?
+                    MemberListCommand command = (MemberListCommand) msg;
+                    command.setMembershipManager(membershipManager);
+                    command.execute(null);
+
+                    //TODO Send MEMBER_JOINED messages to all nodes
+                } catch (ClusteringFault e) {
+                    String errMsg = "Cannot handle MEMBER_LIST message";
+                    log.error(errMsg, e);
+                    throw new RemoteProcessException(errMsg, e);
+                }
+            }
+            return null;
+        }
+
+        public void leftOver(Serializable msg, org.apache.catalina.tribes.Member sender) {
+        }
+    }
+}