You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2012/03/01 12:28:59 UTC

svn commit: r1295519 - in /axis/axis2/java/core/branches/1_6/modules: clustering/src/org/apache/axis2/clustering/ clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clustering/

Author: azeez
Date: Thu Mar  1 11:28:58 2012
New Revision: 1295519

URL: http://svn.apache.org/viewvc?rev=1295519&view=rev
Log:
Manually merging in all clustering changes from Axis2 trunk to the Axis2 1_6 branch

Added:
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java
    axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java
Modified:
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
    axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
    axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java

Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java Thu Mar  1 11:28:58 2012
@@ -61,7 +61,7 @@ public class ClusteringUtils {
                     tempDirectory + File.separator +
                     (System.currentTimeMillis() + RANDOM.nextDouble());
             if(!new File(tempDirName).mkdirs()) {
-                 throw new Exception("Could not create temp dir " + tempDirName);
+                throw new Exception("Could not create temp dir " + tempDirName);
             }
             serviceArchive = new File(tempDirName + File.separator + serviceGroupName);
             FileOutputStream out = null;

Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java Thu Mar  1 11:28:58 2012
@@ -0,0 +1,99 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.                         
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.RpcMessage;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.util.Logs;
+
+import java.io.Serializable;
+
+/**
+ * Represents a Tribes GroupChannel. The difference between
+ * org.apache.catalina.tribes.group.GroupChannel & this class is that the proper classloaders
+ * are set before message deserialization
+ */
+public class Axis2GroupChannel extends GroupChannel{
+
+    @Override
+    public void messageReceived(ChannelMessage msg) {
+        if ( msg == null ) return;
+        try {
+            if ( Logs.MESSAGES.isTraceEnabled() ) {
+                Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) +
+                                    " at " +new java.sql.Timestamp(System.currentTimeMillis())+
+                                    " from "+msg.getAddress().getName());
+            }
+
+            Serializable fwd;
+            if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
+                fwd = new ByteMessage(msg.getMessage().getBytes());
+            } else {
+                try {
+                    fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
+                                                  msg.getMessage().getLength(),
+                                                  ClassLoaderUtil.getClassLoaders());
+                }catch (Exception sx) {
+                    log.error("Unable to deserialize message:"+msg,sx);
+                    return;
+                }
+            }
+            if ( Logs.MESSAGES.isTraceEnabled() ) {
+                Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
+            }
+
+            //get the actual member with the correct alive time
+            Member source = msg.getAddress();
+            boolean rx = false;
+            boolean delivered = false;
+            for (Object channelListener1 : channelListeners) {
+                ChannelListener channelListener = (ChannelListener) channelListener1;
+                if (channelListener != null && channelListener.accept(fwd, source)) {
+                    channelListener.messageReceived(fwd, source);
+                    delivered = true;
+                    //if the message was accepted by an RPC channel, that channel
+                    //is responsible for returning the reply, otherwise we send an absence reply
+                    if (channelListener instanceof RpcChannel) rx = true;
+                }
+            }//for
+            if ((!rx) && (fwd instanceof RpcMessage)) {
+                //if we have a message that requires a response,
+                //but none was given, send back an immediate one
+                sendNoRpcChannelReply((RpcMessage)fwd,source);
+            }
+            if ( Logs.MESSAGES.isTraceEnabled() ) {
+                Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
+            }
+
+        } catch ( Exception x ) {
+            //this could be the channel listener throwing an exception, we should log it
+            //as a warning.
+            if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
+            throw new RemoteProcessException("Exception:"+x.getMessage(),x);
+        }
+    }
+}

Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Thu Mar  1 11:28:58 2012
@@ -49,9 +49,9 @@ public class ChannelSender implements Me
         this.synchronizeAllMembers = synchronizeAllMembers;
     }
 
-    public void sendToGroup(ClusteringCommand msg,
-                            MembershipManager membershipManager,
-                            int additionalOptions) throws ClusteringFault {
+    public synchronized void sendToGroup(ClusteringCommand msg,
+                                         MembershipManager membershipManager,
+                                         int additionalOptions) throws ClusteringFault {
         if (channel == null) {
             return;
         }
@@ -65,6 +65,7 @@ public class ChannelSender implements Me
                     channel.send(members, toByteMessage(msg),
                                  Channel.SEND_OPTIONS_USE_ACK |
                                  Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+                                 Channel.SEND_OPTIONS_BYTE_MESSAGE |
                                  TribesConstants.MSG_ORDER_OPTION |
                                  TribesConstants.AT_MOST_ONCE_OPTION |
                                  additionalOptions);
@@ -72,6 +73,7 @@ public class ChannelSender implements Me
                     channel.send(members, toByteMessage(msg),
                                  Channel.SEND_OPTIONS_ASYNCHRONOUS |
                                  TribesConstants.MSG_ORDER_OPTION |
+                                 Channel.SEND_OPTIONS_BYTE_MESSAGE |
                                  TribesConstants.AT_MOST_ONCE_OPTION |
                                  additionalOptions);
                 }
@@ -100,7 +102,7 @@ public class ChannelSender implements Me
     }
 
     public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
-         sendToGroup(msg, this.membershipManager, 0);
+        sendToGroup(msg, this.membershipManager, 0);
     }
 
     private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
@@ -119,7 +121,8 @@ public class ChannelSender implements Me
         try {
             channel.send(new Member[]{channel.getLocalMember(true)},
                          toByteMessage(msg),
-                         Channel.SEND_OPTIONS_USE_ACK);
+                         Channel.SEND_OPTIONS_USE_ACK |
+                         Channel.SEND_OPTIONS_BYTE_MESSAGE);
             if (log.isDebugEnabled()) {
                 log.debug("Sent " + msg + " to self");
             }
@@ -134,6 +137,7 @@ public class ChannelSender implements Me
                 channel.send(new Member[]{member}, toByteMessage(cmd),
                              Channel.SEND_OPTIONS_USE_ACK |
                              Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+                             Channel.SEND_OPTIONS_BYTE_MESSAGE |
                              TribesConstants.MSG_ORDER_OPTION |
                              TribesConstants.AT_MOST_ONCE_OPTION);
                 if (log.isDebugEnabled()) {

Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java Thu Mar  1 11:28:58 2012
@@ -0,0 +1,88 @@
+/*
+*  Copyright 2004,2005 The Apache Software Foundation.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A util for manipulating classloaders to be used while serializing & deserializing Tribes messages
+ */
+public class ClassLoaderUtil {
+
+    private static Map<String, ClassLoader> classLoaders =
+            new ConcurrentHashMap<String, ClassLoader>();
+
+    public static void init(AxisConfiguration configuration) {
+        classLoaders.put("system", configuration.getSystemClassLoader());
+        classLoaders.put("axis2", ClassLoaderUtil.class.getClassLoader());
+        for (Iterator iter = configuration.getServiceGroups(); iter.hasNext(); ) {
+            AxisServiceGroup group = (AxisServiceGroup) iter.next();
+            ClassLoader serviceGroupClassLoader = group.getServiceGroupClassLoader();
+            if (serviceGroupClassLoader != null) {
+                classLoaders.put(getServiceGroupMapKey(group), serviceGroupClassLoader);
+            }
+        }
+        for (Object obj : configuration.getModules().values()) {
+            AxisModule module = (AxisModule) obj;
+            ClassLoader moduleClassLoader = module.getModuleClassLoader();
+            if (moduleClassLoader != null) {
+                classLoaders.put(getModuleMapKey(module), moduleClassLoader);
+            }
+        }
+    }
+
+    public static void addServiceGroupClassLoader(AxisServiceGroup serviceGroup) {
+        ClassLoader serviceGroupClassLoader = serviceGroup.getServiceGroupClassLoader();
+        if (serviceGroupClassLoader != null) {
+            classLoaders.put(getServiceGroupMapKey(serviceGroup), serviceGroupClassLoader);
+        }
+    }
+
+    public static void removeServiceGroupClassLoader(AxisServiceGroup serviceGroup) {
+        classLoaders.remove(getServiceGroupMapKey(serviceGroup));
+    }
+
+    private static String getServiceGroupMapKey(AxisServiceGroup serviceGroup) {
+        return serviceGroup.getServiceGroupName() + "$#sg";
+    }
+
+    public static void addModuleClassLoader(AxisModule module) {
+        ClassLoader moduleClassLoader = module.getModuleClassLoader();
+        if (moduleClassLoader != null) {
+            classLoaders.put(getModuleMapKey(module), moduleClassLoader);
+        }
+    }
+
+    public static void removeModuleClassLoader(AxisModule axisModule) {
+        classLoaders.remove(getModuleMapKey(axisModule));
+    }
+
+    private static String getModuleMapKey(AxisModule module) {
+        return module.getName() + "-" + module.getVersion() + "$#mod";
+    }
+
+    public static ClassLoader[] getClassLoaders() {
+        return classLoaders.values().toArray(new ClassLoader[classLoaders.size()]);
+    }
+}

Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Thu Mar  1 11:28:58 2012
@@ -37,6 +37,10 @@ import org.apache.commons.logging.LogFac
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Responsible for managing the membership. Handles membership changes.
@@ -68,10 +72,22 @@ public class MembershipManager {
     private final List<Member> wkaMembers = new ArrayList<Member>();
 
     /**
+     * List of Well-Known members which have not responded to the MEMBER_LIST message.
+     * We need to retry sending the MEMBER_LIST message to these members until they respond,
+     * otherwise, we cannot be sure whether these WKA members added the members in the MEMBER_LIST
+     */
+    private final List<Member> nonRespondingWkaMembers = new CopyOnWriteArrayList<Member>();
+
+    /**
      * The member representing this node
      */
     private Member localMember;
 
+    /**
+     *
+     */
+    private boolean isMemberListResponseReceived;
+
     public MembershipManager(ConfigurationContext configContext) {
         this.configContext = configContext;
     }
@@ -87,9 +103,10 @@ public class MembershipManager {
         return rpcMembershipChannel;
     }
 
-    public void setStaticMembershipInterceptor(
-            StaticMembershipInterceptor staticMembershipInterceptor) {
+    public void setupStaticMembershipManagement(StaticMembershipInterceptor staticMembershipInterceptor) {
         this.staticMembershipInterceptor = staticMembershipInterceptor;
+        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+        scheduler.scheduleWithFixedDelay(new MemberListSenderTask(), 5, 5, TimeUnit.SECONDS);
     }
 
     public void setGroupManagementAgent(GroupManagementAgent groupManagementAgent) {
@@ -157,54 +174,93 @@ public class MembershipManager {
         }
 
         if (shouldAddMember) {
+            boolean wkaMemberBelongsToLocalDomain = true;
             if (rpcMembershipChannel != null && isLocalMemberInitialized() &&
                 wkaMembers.contains(member)) { // if it is a well-known member
 
                 log.info("A WKA member " + TribesUtil.getName(member) +
                          " just joined the group. Sending MEMBER_LIST message.");
-                // send the member list to it
-                MemberListCommand memListCmd;
-                try {
-                    memListCmd = new MemberListCommand();
-                    List<Member> members = new ArrayList<Member>(this.members);
-                    members.add(localMember); // Need to set the local member too
-                    memListCmd.setMembers(members.toArray(new Member[members.size()]));
-
-                    Response[] responses =
-                            rpcMembershipChannel.send(new Member[]{member}, memListCmd,
-                                                      RpcChannel.ALL_REPLY,
-                                                      Channel.SEND_OPTIONS_ASYNCHRONOUS |
-                                                      TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
-
-                    // Once a response is received from the WKA member to the MEMBER_LIST message,
-                    // if it does not belong to this domain, simply remove it from the members
-                    if (responses != null && responses.length > 0 && responses[0] != null) {
-                        Member source = responses[0].getSource();
-                        if (!TribesUtil.areInSameDomain(source, member)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("WKA Member " + TribesUtil.getName(source) +
-                                          " does not belong to local domain " + new String(domain) +
-                                          ". Hence removing it from the list.");
-                            }
-                            members.remove(member);
-                            return false;
+                wkaMemberBelongsToLocalDomain = sendMemberListToWellKnownMember(member);
+            }
+            if (wkaMemberBelongsToLocalDomain) {
+                members.add(member);
+                if (log.isDebugEnabled()) {
+                    log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
+                              new String(member.getDomain()));
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Task which send MEMBER_LIST messages to WKA members which have not yet responded to the
+     * MEMBER_LIST message
+     */
+    private class MemberListSenderTask implements Runnable {
+        public void run() {
+            try {
+                if (nonRespondingWkaMembers != null && !nonRespondingWkaMembers.isEmpty()) {
+                    for (Member wkaMember : nonRespondingWkaMembers) {
+                        if (wkaMember != null) {
+                            sendMemberListToWellKnownMember(wkaMember);
                         }
                     }
-                } catch (Exception e) {
-                    String errMsg = "Could not send MEMBER_LIST to well-known member " +
-                                    TribesUtil.getName(member);
-                    log.error(errMsg, e);
-                    throw new RemoteProcessException(errMsg, e);
                 }
+            } catch (Throwable e) {
+                log.error("Could not send MemberList to WKA Members", e);
             }
-            members.add(member);
-            if (log.isDebugEnabled()) {
-                log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
-                          new String(member.getDomain()));
+        }
+    }
+
+    /**
+     * Send MEMBER_LIST message to WKA member
+     *
+     * @param wkaMember The WKA member to whom the MEMBER_LIST has to be sent
+     * @return true - if the WKA member belongs to the domain of this local member
+     */
+    private boolean sendMemberListToWellKnownMember(Member wkaMember) {
+        /*if (wkaMember.isFailing() || wkaMember.isSuspect()) {
+            return false;
+        }*/
+        // send the member list to it
+        MemberListCommand memListCmd;
+        try {
+            memListCmd = new MemberListCommand();
+            List<Member> members = new ArrayList<Member>(this.members);
+            members.add(localMember); // Need to set the local member too
+            memListCmd.setMembers(members.toArray(new Member[members.size()]));
+
+            Response[] responses =
+                    rpcMembershipChannel.send(new Member[]{wkaMember}, memListCmd,
+                                              RpcChannel.ALL_REPLY,
+                                              Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                              TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
+
+            // Once a response is received from the WKA member to the MEMBER_LIST message,
+            // if it does not belong to this domain, simply remove it from the members
+            if (responses != null && responses.length > 0 && responses[0] != null) {
+                nonRespondingWkaMembers.remove(wkaMember);
+                Member source = responses[0].getSource();
+                if (!TribesUtil.areInSameDomain(source, wkaMember)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("WKA Member " + TribesUtil.getName(source) +
+                                  " does not belong to local domain " + new String(domain) +
+                                  ". Hence removing it from the list.");
+                    }
+                    return false;
+                }
+            } else { // No response from WKA member
+                nonRespondingWkaMembers.add(wkaMember);
             }
-            return true;
+        } catch (Exception e) {
+            String errMsg = "Could not send MEMBER_LIST to well-known member " +
+                            TribesUtil.getName(wkaMember);
+            log.error(errMsg, e);
+            throw new RemoteProcessException(errMsg, e);
         }
-        return false;
+        return true;
     }
 
     /**
@@ -278,6 +334,7 @@ public class MembershipManager {
      */
     public void memberDisappeared(Member member) {
         members.remove(member);
+        nonRespondingWkaMembers.remove(member);
 
         // Is this an application domain member?
         if (groupManagementAgent != null) {

Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java Thu Mar  1 11:28:58 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringMessage;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+
+/**
+ * Handles RPC messages from members
+ */
+public class RpcMessagingHandler implements RpcCallback {
+
+    private static Log log = LogFactory.getLog(RpcMessagingHandler.class);
+
+    private ConfigurationContext configurationContext;
+
+    public RpcMessagingHandler(ConfigurationContext configurationContext) {
+        this.configurationContext = configurationContext;
+    }
+
+    public void setConfigurationContext(ConfigurationContext configurationContext) {
+        this.configurationContext = configurationContext;
+    }
+
+    public Serializable replyRequest(Serializable msg, Member invoker) {
+        if (log.isDebugEnabled()) {
+            log.debug("RPC request received by RpcMessagingHandler");
+        }
+        if (msg instanceof ClusteringMessage) {
+            ClusteringMessage clusteringMsg = (ClusteringMessage) msg;
+            try {
+                clusteringMsg.execute(configurationContext);
+            } catch (ClusteringFault e) {
+                String errMsg = "Cannot handle RPC message";
+                log.error(errMsg, e);
+                throw new RemoteProcessException(errMsg, e);
+            }
+            return clusteringMsg.getResponse();
+        } else {
+            throw new IllegalArgumentException("Invalid RPC message of type " + msg.getClass() +
+                                               " received");
+        }
+    }
+
+    public void leftOver(Serializable msg, Member member) {
+        //TODO: Method implementation
+    }
+}

Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java Thu Mar  1 11:28:58 2012
@@ -0,0 +1,84 @@
+/*
+*  Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.engine.AxisEvent;
+import org.apache.axis2.engine.AxisObserver;
+
+import java.util.ArrayList;
+
+/**
+ * AxisObserver which specifically handles setting of service & module classloaders for
+ * message deserialization by Tribes
+ */
+public class TribesAxisObserver implements AxisObserver {
+    public void init(AxisConfiguration axisConfiguration) {
+        //Nothing to do
+    }
+
+    public void serviceUpdate(AxisEvent axisEvent, AxisService axisService) {
+        //Nothing to do
+    }
+
+    public void serviceGroupUpdate(AxisEvent axisEvent, AxisServiceGroup axisServiceGroup) {
+        if (axisEvent.getEventType() == AxisEvent.SERVICE_DEPLOY) {
+            ClassLoaderUtil.addServiceGroupClassLoader(axisServiceGroup);
+        } else if (axisEvent.getEventType() == AxisEvent.SERVICE_REMOVE) {
+            ClassLoaderUtil.removeServiceGroupClassLoader(axisServiceGroup);
+        }
+    }
+
+    public void moduleUpdate(AxisEvent axisEvent, AxisModule axisModule) {
+        if (axisEvent.getEventType() == AxisEvent.MODULE_DEPLOY) {
+            ClassLoaderUtil.addModuleClassLoader(axisModule);
+        } else if (axisEvent.getEventType() == AxisEvent.MODULE_DEPLOY) {
+            ClassLoaderUtil.removeModuleClassLoader(axisModule);
+        }
+    }
+
+    public void addParameter(Parameter parameter) throws AxisFault {
+        //Nothing to do
+    }
+
+    public void removeParameter(Parameter parameter) throws AxisFault {
+        //Nothing to do
+    }
+
+    public void deserializeParameters(OMElement omElement) throws AxisFault {
+        //Nothing to do
+    }
+
+    public Parameter getParameter(String carbonHome) {
+        return null;          //Nothing to do
+    }
+
+    public ArrayList<Parameter> getParameters() {
+        return null;          //Nothing to do
+    }
+
+    public boolean isParameterLocked(String carbonHome) {
+        return false;         //Nothing to do
+    }
+}

Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Thu Mar  1 11:28:58 2012
@@ -23,8 +23,10 @@ import org.apache.axiom.om.OMAttribute;
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.ClusteringCommand;
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringMessage;
 import org.apache.axis2.clustering.MembershipListener;
 import org.apache.axis2.clustering.MembershipScheme;
 import org.apache.axis2.clustering.RequestBlockingHandler;
@@ -48,9 +50,10 @@ import org.apache.axis2.engine.DispatchP
 import org.apache.axis2.engine.Phase;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ErrorHandler;
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.UniqueId;
 import org.apache.catalina.tribes.group.Response;
 import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.transport.MultiPointSender;
@@ -83,7 +86,14 @@ public class TribesClusteringAgent imple
 
     private final HashMap<String, Parameter> parameters;
     private ManagedChannel channel;
+    /**
+     * RpcChannel used for cluster initialization interactions
+     */
     private RpcChannel rpcInitChannel;
+    /**
+     * RpcChannel used for RPC messaging interactions
+     */
+    private RpcChannel rpcMessagingChannel;
     private ConfigurationContext configurationContext;
     private Axis2ChannelListener axis2ChannelListener;
     private ChannelSender channelSender;
@@ -104,6 +114,7 @@ public class TribesClusteringAgent imple
     private final Map<String, GroupManagementAgent> groupManagementAgents =
             new HashMap<String, GroupManagementAgent>();
     private boolean clusterManagementMode;
+    private RpcMessagingHandler rpcMessagingHandler;
 
     public TribesClusteringAgent() {
         parameters = new HashMap<String, Parameter>();
@@ -151,7 +162,8 @@ public class TribesClusteringAgent imple
         addRequestBlockingHandlerToInFlows();
         primaryMembershipManager = new MembershipManager(configurationContext);
 
-        channel = new GroupChannel();
+        channel = new Axis2GroupChannel();
+        channel.setHeartbeat(true);
         channelSender = new ChannelSender(channel, primaryMembershipManager, synchronizeAllMembers());
         axis2ChannelListener =
                 new Axis2ChannelListener(configurationContext, configurationManager, contextManager);
@@ -165,10 +177,19 @@ public class TribesClusteringAgent imple
         // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
         rpcInitRequestHandler = new RpcInitializationRequestHandler(configurationContext);
         rpcInitChannel =
-                new RpcChannel(TribesUtil.getRpcInitChannelId(domain),
-                               channel, rpcInitRequestHandler);
+                new RpcChannel(TribesUtil.getRpcInitChannelId(domain), channel,
+                               rpcInitRequestHandler);
         if (log.isDebugEnabled()) {
-            log.debug("Created RPC Channel for domain " + new String(domain));
+            log.debug("Created RPC Init Channel for domain " + new String(domain));
+        }
+
+        // Initialize RpcChannel used for messaging
+        rpcMessagingHandler = new RpcMessagingHandler(configurationContext);
+        rpcMessagingChannel =
+                new RpcChannel(TribesUtil.getRpcMessagingChannelId(domain), channel,
+                               rpcMessagingHandler);
+        if (log.isDebugEnabled()) {
+            log.debug("Created RPC Messaging Channel for domain " + new String(domain));
         }
 
         setMaximumRetries();
@@ -182,10 +203,8 @@ public class TribesClusteringAgent imple
             channel.start(Channel.DEFAULT); // At this point, this member joins the group
             String localHost = TribesUtil.getLocalHost(channel);
             if (localHost.startsWith("127.0.")) {
-                channel.stop(Channel.DEFAULT);
-                throw new ClusteringFault("Cannot join cluster using IP " + localHost +
-                                          ". Please set an IP address other than " +
-                                          localHost + " in the axis2.xml file");
+                log.warn("Local member advertising its IP address as 127.0.0.1. " +
+                         "Remote members will not be able to connect to this member.");
             }
         } catch (ChannelException e) {
             String msg = "Error starting Tribes channel";
@@ -198,6 +217,9 @@ public class TribesClusteringAgent imple
 
         membershipScheme.joinGroup();
 
+        configurationContext.getAxisConfiguration().addObservers(new TribesAxisObserver());
+        ClassLoaderUtil.init(configurationContext.getAxisConfiguration());
+
         // If configuration management is enabled, get the latest config from a neighbour
         if (configurationManager != null) {
             configurationManager.setSender(channelSender);
@@ -212,7 +234,6 @@ public class TribesClusteringAgent imple
             ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
             configurationContext.addContextListener(contextListener);
         }
-
         configurationContext.
                 setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
         log.info("Cluster initialization completed.");
@@ -230,6 +251,48 @@ public class TribesClusteringAgent imple
         }
     }
 
+    public List<ClusteringCommand> sendMessage(ClusteringMessage message,
+                                               boolean isRpcMessage) throws ClusteringFault {
+        List<ClusteringCommand> responseList = new ArrayList<ClusteringCommand>();
+        Member[] members = primaryMembershipManager.getMembers();
+        if (members.length == 0) {
+            return responseList;
+        }
+        if (isRpcMessage) {
+            try {
+                Response[] responses = rpcMessagingChannel.send(members, message, RpcChannel.ALL_REPLY,
+                                                                Channel.SEND_OPTIONS_SYNCHRONIZED_ACK,
+                                                                10000);
+                for (Response response : responses) {
+                    responseList.add((ClusteringCommand)response.getMessage());
+                }
+            } catch (ChannelException e) {
+                String msg = "Error occurred while sending RPC message to cluster.";
+                log.error(msg, e);
+                throw new ClusteringFault(msg, e);
+            }
+        } else {
+            try {
+                channel.send(members, message, 10000, new ErrorHandler(){
+                    public void handleError(ChannelException e, UniqueId uniqueId) {
+                        log.error("Sending failed " + uniqueId, e );
+                    }
+
+                    public void handleCompletion(UniqueId uniqueId) {
+                        if(log.isDebugEnabled()){
+                            log.debug("Sending successful " + uniqueId);
+                        }
+                    }
+                });
+            } catch (ChannelException e) {
+                String msg = "Error occurred while sending message to cluster.";
+                log.error(msg, e);
+                throw new ClusteringFault(msg, e);
+            }
+        }
+        return responseList;
+    }
+
     private void setMemberInfo() throws ClusteringFault {
         Properties memberInfo = new Properties();
         AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
@@ -263,11 +326,14 @@ public class TribesClusteringAgent imple
                 OMElement propEle = (OMElement) iter.next();
                 OMAttribute nameAttrib = propEle.getAttribute(new QName("name"));
                 if(nameAttrib != null){
+                    String attribName = nameAttrib.getAttributeValue();
+                    attribName = replaceProperty(attribName, memberInfo);
+
                     OMAttribute valueAttrib = propEle.getAttribute(new QName("value"));
                     if  (valueAttrib != null) {
                         String attribVal = valueAttrib.getAttributeValue();
                         attribVal = replaceProperty(attribVal, memberInfo);
-                        memberInfo.setProperty(nameAttrib.getAttributeValue(), attribVal);
+                        memberInfo.setProperty(attribName, attribVal);
                     }
                 }
             }
@@ -295,10 +361,13 @@ public class TribesClusteringAgent imple
         // and are assumed to be System properties
         while (indexOfStartingChars < text.indexOf("${") &&
                (indexOfStartingChars = text.indexOf("${")) != -1 &&
-            (indexOfClosingBrace = text.indexOf("}")) != -1) { // Is a property used?
+               (indexOfClosingBrace = text.indexOf("}")) != -1) { // Is a property used?
             String sysProp = text.substring(indexOfStartingChars + 2,
                                             indexOfClosingBrace);
             String propValue = props.getProperty(sysProp);
+            if (propValue == null) {
+                propValue = System.getProperty(sysProp);
+            }
             if (propValue != null) {
                 text = text.substring(0, indexOfStartingChars) + propValue +
                        text.substring(indexOfClosingBrace + 1);
@@ -595,7 +664,7 @@ public class TribesClusteringAgent imple
      * Get some information from a neighbour. This information will be used by this node to
      * initialize itself
      * <p/>
-     * rpcChannel is The utility for sending RPC style messages to the channel
+     * rpcInitChannel is The utility for sending RPC style messages to the channel
      *
      * @param command The control command to send
      * @throws ClusteringFault If initialization code failed on this node
@@ -619,7 +688,7 @@ public class TribesClusteringAgent imple
                             primaryMembershipManager.getLongestLivingMember() : // First try to get from the longest member alive
                             primaryMembershipManager.getRandomMember(); // Else get from a random member
             String memberHost = TribesUtil.getName(member);
-            log.info("Trying to send intialization request to " + memberHost);
+            log.info("Trying to send initialization request to " + memberHost);
             try {
                 if (!sentMembersList.contains(memberHost)) {
                     Response[] responses;
@@ -627,7 +696,8 @@ public class TribesClusteringAgent imple
                     responses = rpcInitChannel.send(new Member[]{member},
                                                     command,
                                                     RpcChannel.FIRST_REPLY,
-                                                    Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                    Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                                    Channel.SEND_OPTIONS_BYTE_MESSAGE,
                                                     10000);
                     if (responses.length == 0) {
                         try {
@@ -710,6 +780,7 @@ public class TribesClusteringAgent imple
         if (channel != null) {
             try {
                 channel.removeChannelListener(rpcInitChannel);
+                channel.removeChannelListener(rpcMessagingChannel);
                 channel.removeChannelListener(axis2ChannelListener);
                 channel.stop(Channel.DEFAULT);
             } catch (ChannelException e) {
@@ -729,6 +800,9 @@ public class TribesClusteringAgent imple
         if (rpcInitRequestHandler != null) {
             rpcInitRequestHandler.setConfigurationContext(configurationContext);
         }
+        if (rpcMessagingHandler!= null) {
+            rpcMessagingHandler.setConfigurationContext(configurationContext);
+        }
         if (axis2ChannelListener != null) {
             axis2ChannelListener.setConfigurationContext(configurationContext);
         }

Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java Thu Mar  1 11:28:58 2012
@@ -26,6 +26,11 @@ public final class TribesConstants {
     public static final String RPC_INIT_CHANNEL = "rpc.init.channel";
 
     /**
+     * The ID of the RPC messaging channel
+     */
+    public static final String RPC_MESSAGING_CHANNEL = "rpc.msg.channel";
+
+    /**
      * The ID of the RPC membership message channel. This channel is only used when WKA
      * membership discovery mechanism is used
      */

Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Thu Mar  1 11:28:58 2012
@@ -101,6 +101,10 @@ public class TribesUtil {
         return (new String(domain) + ":" + TribesConstants.RPC_INIT_CHANNEL).getBytes();
     }
 
+    public static byte[] getRpcMessagingChannelId(byte[] domain) {
+        return (new String(domain) + ":" + TribesConstants.RPC_MESSAGING_CHANNEL).getBytes();
+    }
+
     public static boolean isInDomain(Member member, byte[] domain) {
         return Arrays.equals(domain, member.getDomain());
     }

Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Thu Mar  1 11:28:58 2012
@@ -1,17 +1,17 @@
-/*                                                                             
- * Copyright 2004,2005 The Apache Software Foundation.                         
- *                                                                             
- * Licensed under the Apache License, Version 2.0 (the "License");             
- * you may not use this file except in compliance with the License.            
- * You may obtain a copy of the License at                                     
- *                                                                             
- *      http://www.apache.org/licenses/LICENSE-2.0                             
- *                                                                             
- * Unless required by applicable law or agreed to in writing, software         
- * distributed under the License is distributed on an "AS IS" BASIS,           
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
- * See the License for the specific language governing permissions and         
- * limitations under the License.                                              
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.axis2.clustering.tribes;
 
@@ -153,13 +153,13 @@ public class WkaBasedMembershipScheme im
         try {
             if (localPort != null) {
                 port = Integer.parseInt(((String) localPort.getValue()).trim());
-                port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
+                port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 1000);
             } else { // In cases where the localport needs to be automatically figured out
-                port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
+                port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 1000);
             }
         } catch (IOException e) {
             String msg =
-                    "Could not allocate the specified port or a port in the range 4000-4100 " +
+                    "Could not allocate the specified port or a port in the range 4000-5000 " +
                     "for local host " + localMember.getHostname() +
                     ". Check whether the IP address specified or inferred for the local " +
                     "member is correct.";
@@ -224,7 +224,7 @@ public class WkaBasedMembershipScheme im
                 return true;
             } catch (IOException e) {
                 String msg = e.getMessage();
-                if (msg.indexOf("Connection refused") == -1 && msg.indexOf("connect timed out") == -1) {
+                if (!msg.contains("Connection refused") && !msg.contains("connect timed out")) {
                     log.error("Cannot connect to member " +
                               member.getHostName() + ":" + member.getPort(), e);
                 }
@@ -258,7 +258,7 @@ public class WkaBasedMembershipScheme im
                 } catch (InterruptedException ignored) {
                     ignored.printStackTrace();
                 }
-                getLocalPort(socket, hostname, portstart, retries, -1);
+                portstart = getLocalPort(socket, hostname, portstart, retries, -1);
             }
         }
         return portstart;
@@ -289,7 +289,7 @@ public class WkaBasedMembershipScheme im
             log.debug("Adding Interceptors...");
         }
         TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
-        tcpPingInterceptor.setInterval(100);
+        tcpPingInterceptor.setInterval(10000);
         channel.addInterceptor(tcpPingInterceptor);
         if (log.isDebugEnabled()) {
             log.debug("Added TCP Ping Interceptor");
@@ -299,7 +299,7 @@ public class WkaBasedMembershipScheme im
         TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
 //        tcpFailureDetector.setPrevious(dfi); //TODO: check this
         tcpFailureDetector.setReadTestTimeout(120000);
-        tcpFailureDetector.setConnectTimeout(60000);
+        tcpFailureDetector.setConnectTimeout(180000);
         channel.addInterceptor(tcpFailureDetector);
         if (log.isDebugEnabled()) {
             log.debug("Added TCP Failure Detector");
@@ -310,7 +310,7 @@ public class WkaBasedMembershipScheme im
 
         staticMembershipInterceptor = new StaticMembershipInterceptor();
         staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
-        primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+        primaryMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
         channel.addInterceptor(staticMembershipInterceptor);
         if (log.isDebugEnabled()) {
             log.debug("Added Static Membership Interceptor");
@@ -350,7 +350,7 @@ public class WkaBasedMembershipScheme im
         // Have multiple RPC channels with multiple RPC request handlers for each localDomain
         // This is needed only when this member is running as a load balancer
         for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
-            appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+            appDomainMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
 
             // Create an RpcChannel for each localDomain
             String domain = new String(appDomainMembershipManager.getDomain());
@@ -377,7 +377,6 @@ public class WkaBasedMembershipScheme im
 
         // Send JOIN message to a WKA member
         if (primaryMembershipManager.getMembers().length > 0) {
-            log.info("Sending JOIN message to WKA members...");
             org.apache.catalina.tribes.Member[] wkaMembers = primaryMembershipManager.getMembers(); // The well-known members
             /*try {
                 Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
@@ -386,6 +385,7 @@ public class WkaBasedMembershipScheme im
             Response[] responses = null;
             do {
                 try {
+                    log.info("Sending JOIN message to WKA members...");
                     responses = rpcMembershipChannel.send(wkaMembers,
                                                           new JoinGroupCommand(),
                                                           RpcChannel.ALL_REPLY,
@@ -394,10 +394,8 @@ public class WkaBasedMembershipScheme im
                                                           10000);
                     if (responses.length == 0) {
                         try {
-                            if (log.isDebugEnabled()) {
-                                log.debug("No responses received");
-                            }
-                            Thread.sleep(500);
+                            log.info("No responses received from WKA members");
+                            Thread.sleep(5000);
                         } catch (InterruptedException ignored) {
                         }
                     }

Modified: axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java Thu Mar  1 11:28:58 2012
@@ -184,4 +184,15 @@ public interface ClusteringAgent extends
      * @return the domains of this ClusteringAgent
      */
     Set<String> getDomains();
+
+
+    /**
+     * Send a message to all members in this member's primary cluster
+     *
+     * @param msg  The message to be sent
+     * @param isRpcMessage Indicates whether the message has to be sent in RPC mode
+     * @return A list of responses if the message is sent in RPC mode
+     * @throws ClusteringFault If an error occurs while sending the message
+     */
+    List<ClusteringCommand> sendMessage(ClusteringMessage msg, boolean isRpcMessage) throws ClusteringFault;
 }
\ No newline at end of file

Added: axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java Thu Mar  1 11:28:58 2012
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.clustering;
+
+/**
+ * This is a special ClusteringCommand which is used for messaging. If there is a response,
+ * the response can be retrieved from this command
+ */
+public abstract class ClusteringMessage extends ClusteringCommand {
+
+    /**
+     * Get the response for this message
+     * @return the response for this message
+     */
+    public abstract ClusteringCommand getResponse();
+}