You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/13 22:15:04 UTC

svn commit: r611651 - in /webservices/axis2/trunk/java/modules/clustering: src/org/apache/axis2/clustering/control/ src/org/apache/axis2/clustering/tribes/ test/org/apache/axis2/clustering/

Author: azeez
Date: Sun Jan 13 13:14:53 2008
New Revision: 611651

URL: http://svn.apache.org/viewvc?rev=611651&view=rev
Log:
Using an RpcChannel to send & receive node initialization messages. Each new member joining a cluster has to block until a response to a particular initialization request message is received, hence 
an RpcChannel is most the most appropriate one to be used for this scenario.


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java   (contents, props changed)
      - copied, changed from r611410, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
Removed:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Sun Jan 13 13:14:53 2008
@@ -35,13 +35,14 @@
     private ContextClusteringCommand[] commands;
 
     public void execute(ConfigurationContext configContext) throws ClusteringFault {
-
+        log.info("Received state initialization message");
+        
         // Run this code only if this node is not already initialized
         if (configContext.
                 getPropertyNonReplicable(ClusteringConstants.RECD_STATE_INIT_MSG) == null) {
             configContext.
                 setNonReplicableProperty(ClusteringConstants.RECD_STATE_INIT_MSG, "true");
-            log.info("Received state initialization message");
+//            log.info("Received state initialization message");
             if (commands != null) {
                 for (int i = 0; i < commands.length; i++) {
                     commands[i].execute(configContext);

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Sun Jan 13 13:14:53 2008
@@ -61,15 +61,7 @@
     /**
      * The time a message lives in the receivedMessages Map
      */
-    private static final int TIMEOUT = 5 * 60 * 1000;
-
-    private Channel channel;
-
-
-    public AtMostOnceInterceptor(Channel channel) {
-        this();
-        this.channel = channel;
-    }
+    private static final int TIMEOUT = 60 * 1000;
 
     public AtMostOnceInterceptor() {
 
@@ -87,6 +79,9 @@
                 for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
                     ChannelMessage msg = (ChannelMessage) iterator.next();
                     receivedMessages.remove(msg);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Cleaned up message ");
+                    }
                 }
             }
         };
@@ -96,25 +91,7 @@
     public void messageReceived(ChannelMessage msg) {
         super.messageReceived(msg);
         if (receivedMessages.get(msg) == null) {  // If it is a new message, keep track of it
-            /*XByteBuffer message1 = msg.getMessage();
-
-
-            try {
-                List classLoaders = new ArrayList();
-                classLoaders.add(AtMostOnceInterceptor.class.getClassLoader());
-                Serializable msg2 = XByteBuffer.deserialize(message1.getBytes(),
-                                                            0,
-                                                            message1.getBytes().length,
-                                                            (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
-                log.debug("###### added new msg " + TribesUtil.getLocalHost(channel) + " msg2=" + msg2);
-            } catch (Exception e) {
-                log.error("Cannot deserialize received message", e);
-                return;
-            }*/
-
-
             receivedMessages.put(msg, new Long(System.currentTimeMillis()));
-            super.messageReceived(msg);
         } else {  // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
             log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
         }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Sun Jan 13 13:14:53 2008
@@ -35,6 +35,7 @@
 import org.apache.catalina.tribes.ByteMessage;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcMessage;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,14 +50,14 @@
 
     private DefaultContextManager contextManager;
     private DefaultConfigurationManager configurationManager;
-    private TribesControlCommandProcessor controlCommandProcessor;
+    private ControlCommandProcessor controlCommandProcessor;
 
     private ConfigurationContext configurationContext;
 
     public ChannelListener(ConfigurationContext configurationContext,
                            DefaultConfigurationManager configurationManager,
                            DefaultContextManager contextManager,
-                           TribesControlCommandProcessor controlCommandProcessor) {
+                           ControlCommandProcessor controlCommandProcessor) {
         this.configurationManager = configurationManager;
         this.contextManager = contextManager;
         this.controlCommandProcessor = controlCommandProcessor;
@@ -82,7 +83,7 @@
      * @return boolean
      */
     public boolean accept(Serializable msg, Member sender) {
-        return true;
+        return !(msg instanceof RpcMessage);  // RpcMessages  will not be handled by this listener
     }
 
     /**
@@ -104,8 +105,6 @@
                 AxisModule module = (AxisModule) iter.next();
                 classLoaders.add(module.getModuleClassLoader());
             }
-
-
             byte[] message = ((ByteMessage) msg).getMessage();
             msg = XByteBuffer.deserialize(message,
                                           0,
@@ -120,10 +119,7 @@
         // If the system has not still been intialized, reject all incoming messages, except the
         // GetStateResponseCommand message
         if (configurationContext.
-                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
-            && !(msg instanceof GetStateResponseCommand) &&
-            !(msg instanceof GetConfigurationResponseCommand)) {
-
+                getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
             log.warn("Received message " + msg +
                      " before cluster initialization has been completed from " +
                      TribesUtil.getHost(sender));
@@ -148,8 +144,6 @@
             ctxCmd.execute(configurationContext);
         } else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) {
             configurationManager.process((ConfigurationClusteringCommand) msg);
-        } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
-            controlCommandProcessor.process((ControlCommand) msg, sender);
-        }
+        } 
     }
 }

Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java (from r611410, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java&r1=611410&r2=611651&rev=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java Sun Jan 13 13:14:53 2008
@@ -26,21 +26,14 @@
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.clustering.control.GetStateResponseCommand;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.catalina.tribes.Member;
 
 /**
  * 
  */
-public class TribesControlCommandProcessor {
+public class ControlCommandProcessor {
     private ConfigurationContext configurationContext;
 
-    private ChannelSender channelSender;
-
-    public void setChannelSender(ChannelSender channelSender) {
-        this.channelSender = channelSender;
-    }
-
-    public TribesControlCommandProcessor(ConfigurationContext configurationContext) {
+    public ControlCommandProcessor(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
     }
 
@@ -48,7 +41,7 @@
         this.configurationContext = configurationContext;
     }
 
-    public void process(ControlCommand command, Member sender) throws ClusteringFault {
+    public ControlCommand process(ControlCommand command) throws ClusteringFault {
 
         if (command instanceof GetStateCommand) {
 
@@ -56,28 +49,27 @@
             // this node cannot send a response to the state requester. So we simply return.
             if (configurationContext.
                     getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
-                return;
+                return null;
             }
             command.execute(configurationContext);
             GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
             getStateRespCmd.setCommands(((GetStateCommand) command).getCommands());
-            channelSender.sendToMember(getStateRespCmd, sender);
+            return getStateRespCmd;
         } else if (command instanceof GetConfigurationCommand) {
 
             // If a GetConfigurationCommand is received by a node which has not yet initialized
             // this node cannot send a response to the state requester. So we simply return.
             if (configurationContext.
                     getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
-                return;
+                return null;
             }
             command.execute(configurationContext);
             GetConfigurationResponseCommand
                     getConfigRespCmd = new GetConfigurationResponseCommand();
             getConfigRespCmd.
                     setServiceGroups(((GetConfigurationCommand) command).getServiceGroupNames());
-            channelSender.sendToMember(getConfigRespCmd, sender);
-        } else {
-            command.execute(configurationContext);
+            return getConfigRespCmd;
         }
+        return null;
     }
 }

Propchange: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java?rev=611651&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java Sun Jan 13 13:14:53 2008
@@ -0,0 +1,62 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.axis2.clustering.control.GetConfigurationCommand;
+import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+
+/**
+ * Handles initialization requests(GetConfiguration & GetState) from newly joining members
+ */
+public class InitializationRequestHandler implements RpcCallback {
+
+    private static Log log = LogFactory.getLog(InitializationRequestHandler.class);
+    private ControlCommandProcessor controlCommandProcessor;
+
+
+    public InitializationRequestHandler(ControlCommandProcessor controlCommandProcessor) {
+        this.controlCommandProcessor = controlCommandProcessor;
+    }
+
+    public Serializable replyRequest(Serializable msg, Member member) {
+        if (msg instanceof GetStateCommand || msg instanceof GetConfigurationCommand) {
+            try {
+                log.info("Received " + msg + " initialization request message from " +
+                         TribesUtil.getHost(member));
+                return controlCommandProcessor.process((ControlCommand) msg);
+            } catch (ClusteringFault e) {
+                String errMsg = "Cannot handle initialization request";
+                log.error(errMsg, e);
+                throw new RemoteProcessException(errMsg, e);
+            }
+        }
+        return null;
+    }
+
+    public void leftOver(Serializable msg, Member member) {
+        //TODO: Method implementation
+
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Sun Jan 13 13:14:53 2008
@@ -56,8 +56,12 @@
         return longestLivingMember;
     }
 
+    public static void removeAllMembers() {
+        members.clear();
+    }
+
     public synchronized static Member getRandomMember() {
-        if(members.size() == 0){
+        if (members.size() == 0) {
             return null;
         }
         int memberIndex = new Random().nextInt(members.size());

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sun Jan 13 13:14:53 2008
@@ -45,6 +45,8 @@
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
 import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
 import org.apache.catalina.tribes.transport.ReceiverBase;
@@ -65,14 +67,15 @@
 
     private HashMap parameters;
     private ManagedChannel channel;
+    private RpcChannel rpcChannel;
     private ConfigurationContext configurationContext;
-    private TribesControlCommandProcessor controlCmdProcessor;
+    private ControlCommandProcessor controlCmdProcessor;
     private ChannelListener channelListener;
     private ChannelSender channelSender;
 
     public TribesClusterManager() {
         parameters = new HashMap();
-        controlCmdProcessor = new TribesControlCommandProcessor(configurationContext);
+        controlCmdProcessor = new ControlCommandProcessor(configurationContext);
     }
 
     public ContextManager getContextManager() {
@@ -128,8 +131,6 @@
         channelListener = new ChannelListener(configurationContext, configurationManager,
                                               contextManager, controlCmdProcessor);
 
-        controlCmdProcessor.setChannelSender(channelSender);
-
         // Set the maximum number of retries, if message sending to a particular node fails
         Parameter maxRetriesParam = getParameter("maxRetries");
         int maxRetries = 10;
@@ -185,13 +186,12 @@
 //        OrderInterceptor orderInterceptor = new OrderInterceptor();
 
         // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
-        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor(channel);
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
         channel.addInterceptor(atMostOnceInterceptor);
         atMostOnceInterceptor.setPrevious(dfi);
 
         // Add a reliable failure detector
         TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-//        tcpFailureDetector.setPrevious(dfi);
         tcpFailureDetector.setPrevious(atMostOnceInterceptor);
         channel.addInterceptor(tcpFailureDetector);
 
@@ -213,23 +213,30 @@
             throw new ClusteringFault("Error starting Tribes channel", e);
         }
 
+        // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
+        // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+        RpcChannel rpcChannel =
+                new RpcChannel(domain, channel,
+                               new InitializationRequestHandler(controlCmdProcessor));
+
         log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers();
 
         // If configuration management is enabled, get the latest config from a neighbour
         if (configurationManager != null) {
             configurationManager.setSender(channelSender);
-            getInitializationMessage(channelSender, new GetConfigurationCommand());
+            initializeSystem(rpcChannel, new GetConfigurationCommand());
         }
 
-        // If context replication is enabled, get the latest state from a neighbour  
+        // If context replication is enabled, get the latest state from a neighbour
         if (contextManager != null) {
             contextManager.setSender(channelSender);
             channelListener.setContextManager(contextManager);
-            getInitializationMessage(channelSender, new GetStateCommand());
+            initializeSystem(rpcChannel, new GetStateCommand());
             ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
             configurationContext.addContextListener(contextListener);
         }
+
         configurationContext.
                 setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
     }
@@ -238,10 +245,12 @@
      * Get some information from a neighbour. This information will be used by this node to
      * initialize itself
      *
-     * @param sender  The utility for sending messages to the channel
-     * @param command The control command to send
+     * @param rpcChannel The utility for sending RPC style messages to the channel
+     * @param command    The control command to send
+     * @throws ClusteringFault If initialization code failed on this node
      */
-    private void getInitializationMessage(ChannelSender sender, ControlCommand command) {
+    private void initializeSystem(RpcChannel rpcChannel, ControlCommand command)
+            throws ClusteringFault {
         // If there is at least one member in the cluster,
         //  get the current initialization info from a member
         int numberOfTries = 0; // Don't keep on trying indefinitely
@@ -251,29 +260,34 @@
         List sentMembersList = new ArrayList();
         sentMembersList.add(TribesUtil.getLocalHost(channel));
         Member[] members = MembershipManager.getMembers();
+        if(members.length == 0) return;
 
-        while (members.length > 0 &&
-               configurationContext.
-                       getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
-               && numberOfTries < 50) {
-
-            // While there are members and GetStateResponseCommand is not received do the following
+        while (members.length > 0 && numberOfTries < 50) {
+            Member member = (numberOfTries == 0) ?
+                            MembershipManager.getLongestLivingMember() : // First try to get from the longest member alive 
+                            MembershipManager.getRandomMember(); // Else get from a random member
+            String memberHost = TribesUtil.getHost(member);
             try {
-                Member member = (numberOfTries == 0) ?
-                                MembershipManager.getLongestLivingMember() : // First try to get from the longest alive member
-                                MembershipManager.getRandomMember(); // Else get from a random member
-                if (!sentMembersList.contains(TribesUtil.getHost(member))) {
-                    sender.sendToMember(command, member);
-                    sentMembersList.add(TribesUtil.getHost(member));
-                    log.debug("WAITING FOR INITIALIZATION MESSAGE...");
-                    Thread.sleep(10 * (numberOfTries + 1));
+                if (!sentMembersList.contains(memberHost)) {
+                    Response[] responses = rpcChannel.send(new Member[]{member},
+                                                           command,
+                                                           RpcChannel.FIRST_REPLY,
+                                                           Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                           10000);
+                    ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+                    break;
+                }
+            } catch (ChannelException e) {
+                log.error("Cannot get initialization information from " +
+                          memberHost + ". Will retry in 2 secs.", e);
+                sentMembersList.add(memberHost);
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException ignored) {
                 }
-            } catch (Exception e) {
-                log.error("Cannot get initialization information", e);
-                break;
             }
-            members = MembershipManager.getMembers();
             numberOfTries++;
+            members = MembershipManager.getMembers();
         }
     }
 
@@ -320,6 +334,8 @@
         log.debug("Enter: TribesClusterManager::shutdown");
         if (channel != null) {
             try {
+                channel.removeChannelListener(rpcChannel);
+                channel.removeChannelListener(channelListener);
                 channel.stop(Channel.DEFAULT);
             } catch (ChannelException e) {
 

Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java Sun Jan 13 13:14:53 2008
@@ -25,6 +25,7 @@
 import org.apache.axis2.clustering.context.DefaultContextManager;
 import org.apache.axis2.clustering.context.DefaultContextManagerListener;
 import org.apache.axis2.clustering.tribes.TribesClusterManager;
+import org.apache.axis2.clustering.tribes.MembershipManager;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.ConfigurationContextFactory;
 import org.apache.axis2.context.ServiceContext;
@@ -187,7 +188,7 @@
         clusterManager1 = getClusterManager(configurationContext1, ctxMan1, configMan1);
         clusterManager1.addParameter(domainParam);
         clusterManager1.init();
-        System.out.println("ClusterManager-1 successfully initialized");
+        System.out.println("---------- ClusterManager-1 successfully initialized -----------");
 
         // Second cluster
         configurationContext2 =
@@ -199,7 +200,7 @@
         clusterManager2 = getClusterManager(configurationContext2, ctxMan2, configMan2);
         clusterManager2.addParameter(domainParam);
         clusterManager2.init();
-        System.out.println("ClusterManager-2 successfully initialized");
+        System.out.println("---------- ClusterManager-2 successfully initialized -----------");
     }
 
     protected ClusterManager getClusterManager(ConfigurationContext configCtx,
@@ -616,9 +617,13 @@
         super.tearDown();
         if (clusterManager1 != null) {
             clusterManager1.shutdown();
+            System.out.println("------ CLuster-1 shutdown complete ------");
         }
         if (clusterManager2 != null) {
             clusterManager2.shutdown();
+            System.out.println("------ CLuster-2 shutdown complete ------");
         }
+        MembershipManager.removeAllMembers();
+        Thread.sleep(500);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org