You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/13 22:15:04 UTC
svn commit: r611651 - in /webservices/axis2/trunk/java/modules/clustering:
src/org/apache/axis2/clustering/control/
src/org/apache/axis2/clustering/tribes/ test/org/apache/axis2/clustering/
Author: azeez
Date: Sun Jan 13 13:14:53 2008
New Revision: 611651
URL: http://svn.apache.org/viewvc?rev=611651&view=rev
Log:
Using an RpcChannel to send & receive node initialization messages. Each new member joining a cluster has to block until a response to a particular initialization request message is received, hence
an RpcChannel is most the most appropriate one to be used for this scenario.
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java (contents, props changed)
- copied, changed from r611410, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
Removed:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Sun Jan 13 13:14:53 2008
@@ -35,13 +35,14 @@
private ContextClusteringCommand[] commands;
public void execute(ConfigurationContext configContext) throws ClusteringFault {
-
+ log.info("Received state initialization message");
+
// Run this code only if this node is not already initialized
if (configContext.
getPropertyNonReplicable(ClusteringConstants.RECD_STATE_INIT_MSG) == null) {
configContext.
setNonReplicableProperty(ClusteringConstants.RECD_STATE_INIT_MSG, "true");
- log.info("Received state initialization message");
+// log.info("Received state initialization message");
if (commands != null) {
for (int i = 0; i < commands.length; i++) {
commands[i].execute(configContext);
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Sun Jan 13 13:14:53 2008
@@ -61,15 +61,7 @@
/**
* The time a message lives in the receivedMessages Map
*/
- private static final int TIMEOUT = 5 * 60 * 1000;
-
- private Channel channel;
-
-
- public AtMostOnceInterceptor(Channel channel) {
- this();
- this.channel = channel;
- }
+ private static final int TIMEOUT = 60 * 1000;
public AtMostOnceInterceptor() {
@@ -87,6 +79,9 @@
for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
ChannelMessage msg = (ChannelMessage) iterator.next();
receivedMessages.remove(msg);
+ if (log.isDebugEnabled()) {
+ log.debug("Cleaned up message ");
+ }
}
}
};
@@ -96,25 +91,7 @@
public void messageReceived(ChannelMessage msg) {
super.messageReceived(msg);
if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
- /*XByteBuffer message1 = msg.getMessage();
-
-
- try {
- List classLoaders = new ArrayList();
- classLoaders.add(AtMostOnceInterceptor.class.getClassLoader());
- Serializable msg2 = XByteBuffer.deserialize(message1.getBytes(),
- 0,
- message1.getBytes().length,
- (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
- log.debug("###### added new msg " + TribesUtil.getLocalHost(channel) + " msg2=" + msg2);
- } catch (Exception e) {
- log.error("Cannot deserialize received message", e);
- return;
- }*/
-
-
receivedMessages.put(msg, new Long(System.currentTimeMillis()));
- super.messageReceived(msg);
} else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Sun Jan 13 13:14:53 2008
@@ -35,6 +35,7 @@
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcMessage;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,14 +50,14 @@
private DefaultContextManager contextManager;
private DefaultConfigurationManager configurationManager;
- private TribesControlCommandProcessor controlCommandProcessor;
+ private ControlCommandProcessor controlCommandProcessor;
private ConfigurationContext configurationContext;
public ChannelListener(ConfigurationContext configurationContext,
DefaultConfigurationManager configurationManager,
DefaultContextManager contextManager,
- TribesControlCommandProcessor controlCommandProcessor) {
+ ControlCommandProcessor controlCommandProcessor) {
this.configurationManager = configurationManager;
this.contextManager = contextManager;
this.controlCommandProcessor = controlCommandProcessor;
@@ -82,7 +83,7 @@
* @return boolean
*/
public boolean accept(Serializable msg, Member sender) {
- return true;
+ return !(msg instanceof RpcMessage); // RpcMessages will not be handled by this listener
}
/**
@@ -104,8 +105,6 @@
AxisModule module = (AxisModule) iter.next();
classLoaders.add(module.getModuleClassLoader());
}
-
-
byte[] message = ((ByteMessage) msg).getMessage();
msg = XByteBuffer.deserialize(message,
0,
@@ -120,10 +119,7 @@
// If the system has not still been intialized, reject all incoming messages, except the
// GetStateResponseCommand message
if (configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
- && !(msg instanceof GetStateResponseCommand) &&
- !(msg instanceof GetConfigurationResponseCommand)) {
-
+ getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
log.warn("Received message " + msg +
" before cluster initialization has been completed from " +
TribesUtil.getHost(sender));
@@ -148,8 +144,6 @@
ctxCmd.execute(configurationContext);
} else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) {
configurationManager.process((ConfigurationClusteringCommand) msg);
- } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
- controlCommandProcessor.process((ControlCommand) msg, sender);
- }
+ }
}
}
Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java (from r611410, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java&r1=611410&r2=611651&rev=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java Sun Jan 13 13:14:53 2008
@@ -26,21 +26,14 @@
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.clustering.control.GetStateResponseCommand;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.catalina.tribes.Member;
/**
*
*/
-public class TribesControlCommandProcessor {
+public class ControlCommandProcessor {
private ConfigurationContext configurationContext;
- private ChannelSender channelSender;
-
- public void setChannelSender(ChannelSender channelSender) {
- this.channelSender = channelSender;
- }
-
- public TribesControlCommandProcessor(ConfigurationContext configurationContext) {
+ public ControlCommandProcessor(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
}
@@ -48,7 +41,7 @@
this.configurationContext = configurationContext;
}
- public void process(ControlCommand command, Member sender) throws ClusteringFault {
+ public ControlCommand process(ControlCommand command) throws ClusteringFault {
if (command instanceof GetStateCommand) {
@@ -56,28 +49,27 @@
// this node cannot send a response to the state requester. So we simply return.
if (configurationContext.
getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
- return;
+ return null;
}
command.execute(configurationContext);
GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
getStateRespCmd.setCommands(((GetStateCommand) command).getCommands());
- channelSender.sendToMember(getStateRespCmd, sender);
+ return getStateRespCmd;
} else if (command instanceof GetConfigurationCommand) {
// If a GetConfigurationCommand is received by a node which has not yet initialized
// this node cannot send a response to the state requester. So we simply return.
if (configurationContext.
getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
- return;
+ return null;
}
command.execute(configurationContext);
GetConfigurationResponseCommand
getConfigRespCmd = new GetConfigurationResponseCommand();
getConfigRespCmd.
setServiceGroups(((GetConfigurationCommand) command).getServiceGroupNames());
- channelSender.sendToMember(getConfigRespCmd, sender);
- } else {
- command.execute(configurationContext);
+ return getConfigRespCmd;
}
+ return null;
}
}
Propchange: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java?rev=611651&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java Sun Jan 13 13:14:53 2008
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.axis2.clustering.control.GetConfigurationCommand;
+import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+
+/**
+ * Handles initialization requests(GetConfiguration & GetState) from newly joining members
+ */
+public class InitializationRequestHandler implements RpcCallback {
+
+ private static Log log = LogFactory.getLog(InitializationRequestHandler.class);
+ private ControlCommandProcessor controlCommandProcessor;
+
+
+ public InitializationRequestHandler(ControlCommandProcessor controlCommandProcessor) {
+ this.controlCommandProcessor = controlCommandProcessor;
+ }
+
+ public Serializable replyRequest(Serializable msg, Member member) {
+ if (msg instanceof GetStateCommand || msg instanceof GetConfigurationCommand) {
+ try {
+ log.info("Received " + msg + " initialization request message from " +
+ TribesUtil.getHost(member));
+ return controlCommandProcessor.process((ControlCommand) msg);
+ } catch (ClusteringFault e) {
+ String errMsg = "Cannot handle initialization request";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
+ }
+ }
+ return null;
+ }
+
+ public void leftOver(Serializable msg, Member member) {
+ //TODO: Method implementation
+
+ }
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Sun Jan 13 13:14:53 2008
@@ -56,8 +56,12 @@
return longestLivingMember;
}
+ public static void removeAllMembers() {
+ members.clear();
+ }
+
public synchronized static Member getRandomMember() {
- if(members.size() == 0){
+ if (members.size() == 0) {
return null;
}
int memberIndex = new Random().nextInt(members.size());
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sun Jan 13 13:14:53 2008
@@ -45,6 +45,8 @@
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.tribes.transport.ReceiverBase;
@@ -65,14 +67,15 @@
private HashMap parameters;
private ManagedChannel channel;
+ private RpcChannel rpcChannel;
private ConfigurationContext configurationContext;
- private TribesControlCommandProcessor controlCmdProcessor;
+ private ControlCommandProcessor controlCmdProcessor;
private ChannelListener channelListener;
private ChannelSender channelSender;
public TribesClusterManager() {
parameters = new HashMap();
- controlCmdProcessor = new TribesControlCommandProcessor(configurationContext);
+ controlCmdProcessor = new ControlCommandProcessor(configurationContext);
}
public ContextManager getContextManager() {
@@ -128,8 +131,6 @@
channelListener = new ChannelListener(configurationContext, configurationManager,
contextManager, controlCmdProcessor);
- controlCmdProcessor.setChannelSender(channelSender);
-
// Set the maximum number of retries, if message sending to a particular node fails
Parameter maxRetriesParam = getParameter("maxRetries");
int maxRetries = 10;
@@ -185,13 +186,12 @@
// OrderInterceptor orderInterceptor = new OrderInterceptor();
// Add a AtMostOnceInterceptor to support at-most-once message processing semantics
- AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor(channel);
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
channel.addInterceptor(atMostOnceInterceptor);
atMostOnceInterceptor.setPrevious(dfi);
// Add a reliable failure detector
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-// tcpFailureDetector.setPrevious(dfi);
tcpFailureDetector.setPrevious(atMostOnceInterceptor);
channel.addInterceptor(tcpFailureDetector);
@@ -213,23 +213,30 @@
throw new ClusteringFault("Error starting Tribes channel", e);
}
+ // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
+ // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+ RpcChannel rpcChannel =
+ new RpcChannel(domain, channel,
+ new InitializationRequestHandler(controlCmdProcessor));
+
log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers();
// If configuration management is enabled, get the latest config from a neighbour
if (configurationManager != null) {
configurationManager.setSender(channelSender);
- getInitializationMessage(channelSender, new GetConfigurationCommand());
+ initializeSystem(rpcChannel, new GetConfigurationCommand());
}
- // If context replication is enabled, get the latest state from a neighbour
+ // If context replication is enabled, get the latest state from a neighbour
if (contextManager != null) {
contextManager.setSender(channelSender);
channelListener.setContextManager(contextManager);
- getInitializationMessage(channelSender, new GetStateCommand());
+ initializeSystem(rpcChannel, new GetStateCommand());
ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
configurationContext.addContextListener(contextListener);
}
+
configurationContext.
setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
}
@@ -238,10 +245,12 @@
* Get some information from a neighbour. This information will be used by this node to
* initialize itself
*
- * @param sender The utility for sending messages to the channel
- * @param command The control command to send
+ * @param rpcChannel The utility for sending RPC style messages to the channel
+ * @param command The control command to send
+ * @throws ClusteringFault If initialization code failed on this node
*/
- private void getInitializationMessage(ChannelSender sender, ControlCommand command) {
+ private void initializeSystem(RpcChannel rpcChannel, ControlCommand command)
+ throws ClusteringFault {
// If there is at least one member in the cluster,
// get the current initialization info from a member
int numberOfTries = 0; // Don't keep on trying indefinitely
@@ -251,29 +260,34 @@
List sentMembersList = new ArrayList();
sentMembersList.add(TribesUtil.getLocalHost(channel));
Member[] members = MembershipManager.getMembers();
+ if(members.length == 0) return;
- while (members.length > 0 &&
- configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
- && numberOfTries < 50) {
-
- // While there are members and GetStateResponseCommand is not received do the following
+ while (members.length > 0 && numberOfTries < 50) {
+ Member member = (numberOfTries == 0) ?
+ MembershipManager.getLongestLivingMember() : // First try to get from the longest member alive
+ MembershipManager.getRandomMember(); // Else get from a random member
+ String memberHost = TribesUtil.getHost(member);
try {
- Member member = (numberOfTries == 0) ?
- MembershipManager.getLongestLivingMember() : // First try to get from the longest alive member
- MembershipManager.getRandomMember(); // Else get from a random member
- if (!sentMembersList.contains(TribesUtil.getHost(member))) {
- sender.sendToMember(command, member);
- sentMembersList.add(TribesUtil.getHost(member));
- log.debug("WAITING FOR INITIALIZATION MESSAGE...");
- Thread.sleep(10 * (numberOfTries + 1));
+ if (!sentMembersList.contains(memberHost)) {
+ Response[] responses = rpcChannel.send(new Member[]{member},
+ command,
+ RpcChannel.FIRST_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+ break;
+ }
+ } catch (ChannelException e) {
+ log.error("Cannot get initialization information from " +
+ memberHost + ". Will retry in 2 secs.", e);
+ sentMembersList.add(memberHost);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
}
- } catch (Exception e) {
- log.error("Cannot get initialization information", e);
- break;
}
- members = MembershipManager.getMembers();
numberOfTries++;
+ members = MembershipManager.getMembers();
}
}
@@ -320,6 +334,8 @@
log.debug("Enter: TribesClusterManager::shutdown");
if (channel != null) {
try {
+ channel.removeChannelListener(rpcChannel);
+ channel.removeChannelListener(channelListener);
channel.stop(Channel.DEFAULT);
} catch (ChannelException e) {
Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java?rev=611651&r1=611650&r2=611651&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java Sun Jan 13 13:14:53 2008
@@ -25,6 +25,7 @@
import org.apache.axis2.clustering.context.DefaultContextManager;
import org.apache.axis2.clustering.context.DefaultContextManagerListener;
import org.apache.axis2.clustering.tribes.TribesClusterManager;
+import org.apache.axis2.clustering.tribes.MembershipManager;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
import org.apache.axis2.context.ServiceContext;
@@ -187,7 +188,7 @@
clusterManager1 = getClusterManager(configurationContext1, ctxMan1, configMan1);
clusterManager1.addParameter(domainParam);
clusterManager1.init();
- System.out.println("ClusterManager-1 successfully initialized");
+ System.out.println("---------- ClusterManager-1 successfully initialized -----------");
// Second cluster
configurationContext2 =
@@ -199,7 +200,7 @@
clusterManager2 = getClusterManager(configurationContext2, ctxMan2, configMan2);
clusterManager2.addParameter(domainParam);
clusterManager2.init();
- System.out.println("ClusterManager-2 successfully initialized");
+ System.out.println("---------- ClusterManager-2 successfully initialized -----------");
}
protected ClusterManager getClusterManager(ConfigurationContext configCtx,
@@ -616,9 +617,13 @@
super.tearDown();
if (clusterManager1 != null) {
clusterManager1.shutdown();
+ System.out.println("------ CLuster-1 shutdown complete ------");
}
if (clusterManager2 != null) {
clusterManager2.shutdown();
+ System.out.println("------ CLuster-2 shutdown complete ------");
}
+ MembershipManager.removeAllMembers();
+ Thread.sleep(500);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org