You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/01/14 21:44:14 UTC
svn commit: r611922 - in /webservices/axis2/trunk/java/modules/clustering:
src/org/apache/axis2/clustering/tribes/ test/org/apache/axis2/clustering/
Author: azeez
Date: Mon Jan 14 12:44:05 2008
New Revision: 611922
URL: http://svn.apache.org/viewvc?rev=611922&view=rev
Log:
1. Fixing few potential issues in the test cases
2. Makng the methods in MembershipManager non-static
3. Adding an OrderInterceptor to preserve sender ordering
4. Fixing a bug in the AtMostOnceInterceptor
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=611922&r1=611921&r2=611922&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Mon Jan 14 12:44:05 2008
@@ -15,45 +15,21 @@
*/
package org.apache.axis2.clustering.tribes;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ByteMessage;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.Membership;
-import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.axis2.description.AxisServiceGroup;
-import org.apache.axis2.description.AxisModule;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.Timer;
import java.util.Iterator;
import java.util.List;
-import java.util.ArrayList;
-import java.net.Socket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.ConnectException;
-import java.io.Serializable;
+import java.util.Map;
/**
* Message intereceptor for handling at-most-once message processing semantics
*/
-public class AtMostOnceInterceptor extends ChannelInterceptorBase {
+public class AtMostOnceInterceptor extends ChannelInterceptorBase {
private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class);
private static final Map receivedMessages = new HashMap();
@@ -64,36 +40,58 @@
private static final int TIMEOUT = 60 * 1000;
public AtMostOnceInterceptor() {
+ Thread cleanupThread = new Thread(new MessageCleanupTask());
+ cleanupThread.setPriority(Thread.MIN_PRIORITY);
+ cleanupThread.start();
+ }
- TimerTask cleanupTask = new TimerTask() {
- public void run() {
- List toBeRemoved = new ArrayList();
- for (Iterator iterator = receivedMessages.keySet().iterator();
- iterator.hasNext();) {
- ChannelMessage msg = (ChannelMessage) iterator.next();
- long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
- if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
- toBeRemoved.add(msg);
- }
+ public void messageReceived(ChannelMessage msg) {
+ synchronized (receivedMessages) {
+ if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
+ receivedMessages.put(msg, new Long(System.currentTimeMillis()));
+ super.messageReceived(msg);
+ } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
+ log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
+ }
+ }
+ }
+
+ private class MessageCleanupTask implements Runnable {
+
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(TIMEOUT);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
- ChannelMessage msg = (ChannelMessage) iterator.next();
- receivedMessages.remove(msg);
- if (log.isDebugEnabled()) {
- log.debug("Cleaned up message ");
+ try {
+ List toBeRemoved = new ArrayList();
+ synchronized (receivedMessages) {
+ for (Iterator iterator = receivedMessages.keySet().iterator();
+ iterator.hasNext();) {
+ ChannelMessage msg = (ChannelMessage) iterator.next();
+ long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
+ if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
+ toBeRemoved.add(msg);
+ }
+ }
+ long start = System.currentTimeMillis();
+ for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
+ ChannelMessage msg = (ChannelMessage) iterator.next();
+ receivedMessages.remove(msg);
+ if (log.isDebugEnabled()) {
+ log.debug("Cleaned up message ");
+ }
+ if(System.currentTimeMillis() - start > 30000){
+ break;
+ }
+ }
}
+ } catch (Exception e) {
+ log.error("Exception occurred while trying to cleanup messages", e);
}
}
- };
- new Timer().scheduleAtFixedRate(cleanupTask, TIMEOUT, TIMEOUT);
- }
-
- public void messageReceived(ChannelMessage msg) {
- super.messageReceived(msg);
- if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
- receivedMessages.put(msg, new Long(System.currentTimeMillis()));
- } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
- log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=611922&r1=611921&r2=611922&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Mon Jan 14 12:44:05 2008
@@ -39,9 +39,13 @@
private Log log = LogFactory.getLog(ChannelSender.class);
private Channel channel;
private boolean synchronizeAllMembers;
+ private MembershipManager membershipManager;
- public ChannelSender(Channel channel, boolean synchronizeAllMembers) {
+ public ChannelSender(Channel channel,
+ MembershipManager membershipManager,
+ boolean synchronizeAllMembers) {
this.channel = channel;
+ this.membershipManager = membershipManager;
this.synchronizeAllMembers = synchronizeAllMembers;
}
@@ -49,7 +53,7 @@
if (channel == null) {
return;
}
- Member[] members = MembershipManager.getMembers();
+ Member[] members = membershipManager.getMembers();
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
// causing a view change. All nodes in a view should get the msg
@@ -57,7 +61,9 @@
try {
if (synchronizeAllMembers) {
channel.send(members, toByteMessage(msg),
- Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ Channel.SEND_OPTIONS_USE_ACK |
+ Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+ TribesClusterManager.MSG_ORDER_OPTION);
} else {
channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_ASYNCHRONOUS);
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=611922&r1=611921&r2=611922&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Mon Jan 14 12:44:05 2008
@@ -25,21 +25,21 @@
* Responsible for managing the membership
*/
public class MembershipManager {
- private static final List members = new ArrayList();
+ private final List members = new ArrayList();
- public synchronized static void memberAdded(Member member) {
+ public synchronized void memberAdded(Member member) {
members.add(member);
}
- public synchronized static void memberDisappeared(Member member) {
+ public synchronized void memberDisappeared(Member member) {
members.remove(member);
}
- public synchronized static Member[] getMembers() {
+ public synchronized Member[] getMembers() {
return (Member[]) members.toArray(new Member[members.size()]);
}
- public synchronized static Member getLongestLivingMember() {
+ public synchronized Member getLongestLivingMember() {
Member longestLivingMember = null;
if (members.size() > 0) {
Member member0 = (Member) members.get(0);
@@ -56,11 +56,11 @@
return longestLivingMember;
}
- public static void removeAllMembers() {
+ public synchronized void removeAllMembers() {
members.clear();
}
- public synchronized static Member getRandomMember() {
+ public synchronized Member getRandomMember() {
if (members.size() == 0) {
return null;
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611922&r1=611921&r2=611922&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan 14 12:44:05 2008
@@ -49,6 +49,7 @@
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.transport.ReplicationTransmitter;
import org.apache.commons.logging.Log;
@@ -60,6 +61,7 @@
import java.util.List;
public class TribesClusterManager implements ClusterManager {
+ public static final int MSG_ORDER_OPTION = 512;
private static final Log log = LogFactory.getLog(TribesClusterManager.class);
private DefaultConfigurationManager configurationManager;
@@ -72,6 +74,7 @@
private ControlCommandProcessor controlCmdProcessor;
private ChannelListener channelListener;
private ChannelSender channelSender;
+ private MembershipManager membershipManager;
public TribesClusterManager() {
parameters = new HashMap();
@@ -125,9 +128,9 @@
}
}
}
-
+ membershipManager = new MembershipManager();
channel = new GroupChannel();
- channelSender = new ChannelSender(channel, synchronizeAllMembers());
+ channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers());
channelListener = new ChannelListener(configurationContext, configurationManager,
contextManager, controlCmdProcessor);
@@ -183,20 +186,22 @@
mcastProps.setProperty("tcpListenPort", "4000");
mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
-// OrderInterceptor orderInterceptor = new OrderInterceptor();
+ // Add the OrderInterceptor to preserve sender ordering
+ OrderInterceptor orderInterceptor = new OrderInterceptor();
+ orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
+ channel.addInterceptor(orderInterceptor);
// Add a AtMostOnceInterceptor to support at-most-once message processing semantics
AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
channel.addInterceptor(atMostOnceInterceptor);
- atMostOnceInterceptor.setPrevious(dfi);
// Add a reliable failure detector
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
- tcpFailureDetector.setPrevious(atMostOnceInterceptor);
channel.addInterceptor(tcpFailureDetector);
channel.addChannelListener(channelListener);
- TribesMembershipListener membershipListener = new TribesMembershipListener();
+
+ TribesMembershipListener membershipListener = new TribesMembershipListener(membershipManager);
channel.addMembershipListener(membershipListener);
try {
channel.start(Channel.DEFAULT);
@@ -219,8 +224,8 @@
new RpcChannel(domain, channel,
new InitializationRequestHandler(controlCmdProcessor));
- log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
- TribesUtil.printMembers();
+ log.info("Local Member " + TribesUtil.getLocalHost(channel));
+ TribesUtil.printMembers(membershipManager);
// If configuration management is enabled, get the latest config from a neighbour
if (configurationManager != null) {
@@ -259,13 +264,13 @@
// Do not send another request to these members
List sentMembersList = new ArrayList();
sentMembersList.add(TribesUtil.getLocalHost(channel));
- Member[] members = MembershipManager.getMembers();
+ Member[] members = membershipManager.getMembers();
if(members.length == 0) return;
- while (members.length > 0 && numberOfTries < 50) {
+ while (members.length > 0 && numberOfTries < 5) {
Member member = (numberOfTries == 0) ?
- MembershipManager.getLongestLivingMember() : // First try to get from the longest member alive
- MembershipManager.getRandomMember(); // Else get from a random member
+ membershipManager.getLongestLivingMember() : // First try to get from the longest member alive
+ membershipManager.getRandomMember(); // Else get from a random member
String memberHost = TribesUtil.getHost(member);
try {
if (!sentMembersList.contains(memberHost)) {
@@ -274,8 +279,10 @@
RpcChannel.FIRST_REPLY,
Channel.SEND_OPTIONS_ASYNCHRONOUS,
10000);
- ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
- break;
+ if (responses.length > 0) {
+ ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+ break;
+ }
}
} catch (ChannelException e) {
log.error("Cannot get initialization information from " +
@@ -284,10 +291,14 @@
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {
+ log.debug("Interrupted", ignored);
}
}
numberOfTries++;
- members = MembershipManager.getMembers();
+ members = membershipManager.getMembers();
+ if(numberOfTries >= members.length){
+ break;
+ }
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?rev=611922&r1=611921&r2=611922&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Mon Jan 14 12:44:05 2008
@@ -23,22 +23,27 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-/**
+/** In
*
*/
public class TribesMembershipListener implements MembershipListener {
private static Log log = LogFactory.getLog(TribesMembershipListener.class);
+ private MembershipManager membershipManager;
+
+ public TribesMembershipListener(MembershipManager membershipManager) {
+ this.membershipManager = membershipManager;
+ }
public void memberAdded(Member member) {
log.info("New member " + TribesUtil.getHost(member) + " joined cluster.");
- MembershipManager.memberAdded(member);
+ membershipManager.memberAdded(member);
// System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
}
public void memberDisappeared(Member member) {
log.info("Member " + TribesUtil.getHost(member) + " left cluster");
- MembershipManager.memberDisappeared(member);
+ membershipManager.memberDisappeared(member);
// System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=611922&r1=611921&r2=611922&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan 14 12:44:05 2008
@@ -28,8 +28,8 @@
private static Log log = LogFactory.getLog(TribesUtil.class);
- public static void printMembers() {
- Member[] members = MembershipManager.getMembers();
+ public static void printMembers(MembershipManager membershipManager) {
+ Member[] members = membershipManager.getMembers();
if (members != null) {
int length = members.length;
if (length > 0) {
Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java?rev=611922&r1=611921&r2=611922&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java Mon Jan 14 12:44:05 2008
@@ -25,7 +25,6 @@
import org.apache.axis2.clustering.context.DefaultContextManager;
import org.apache.axis2.clustering.context.DefaultContextManagerListener;
import org.apache.axis2.clustering.tribes.TribesClusterManager;
-import org.apache.axis2.clustering.tribes.MembershipManager;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
import org.apache.axis2.context.ServiceContext;
@@ -36,14 +35,14 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.transport.http.server.HttpUtils;
-import java.util.ArrayList;
-import java.util.List;
-import java.net.MulticastSocket;
-import java.net.InetAddress;
+import java.io.IOException;
import java.net.DatagramPacket;
-import java.net.ServerSocket;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.io.IOException;
+import java.net.MulticastSocket;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
/**
* Tests the replication of properties placed the ConfigurationContext, ServiceGroupContext &
@@ -127,7 +126,6 @@
byte buf[] = new byte[1024];
DatagramPacket pack = new DatagramPacket(buf, buf.length);
s.receive(pack);
-
System.out.println("Received data from: " + pack.getAddress().toString() +
":" + pack.getPort() + " with length: " +
pack.getLength());
@@ -163,6 +161,13 @@
}
};
sender.start();
+
+ // Join the receiver until we can verify whether multicasting can be done
+ try {
+ receiver.join(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
public static void main(String[] args) {
@@ -267,6 +272,7 @@
String val2 = "configCtxVal1";
configurationContext2.setProperty(key2, val2);
ctxMan2.updateContext(configurationContext2);
+ Thread.sleep(1000);
String value = (String) configurationContext1.getProperty(key2);
assertEquals(val2, value);
}
@@ -347,7 +353,7 @@
// Remove the property
serviceGroupContext2.removeProperty(key1);
assertNull(serviceGroupContext2.getProperty(key1));
- ctxMan1.updateContext(serviceGroupContext2);
+ ctxMan2.updateContext(serviceGroupContext2);
assertNull(serviceGroupContext1.getProperty(key1));
}
@@ -408,7 +414,7 @@
// Remove the property
serviceGroupContext2.removeProperty(key1);
assertNull(serviceGroupContext2.getProperty(key1));
- ctxMan1.updateContext(serviceGroupContext2);
+ ctxMan2.updateContext(serviceGroupContext2);
assertNull(serviceGroupContext1.getProperty(key1));
}
@@ -502,7 +508,7 @@
// Remove the property
serviceContext2.removeProperty(key1);
assertNull(serviceContext2.getProperty(key1));
- ctxMan1.updateContext(serviceContext2);
+ ctxMan2.updateContext(serviceContext2);
assertNull(serviceContext1.getProperty(key1));
}
@@ -538,7 +544,7 @@
// Remove the property
serviceContext2.removeProperty(key1);
assertNull(serviceContext2.getProperty(key1));
- ctxMan1.updateContext(serviceContext2);
+ ctxMan2.updateContext(serviceContext2);
assertNull(serviceContext1.getProperty(key1));
}
@@ -623,7 +629,7 @@
clusterManager2.shutdown();
System.out.println("------ CLuster-2 shutdown complete ------");
}
- MembershipManager.removeAllMembers();
+// MembershipManager.removeAllMembers();
Thread.sleep(500);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org