You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/03 20:48:04 UTC
svn commit: r399382 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
Channel.java MembershipService.java group/ChannelCoordinator.java
membership/McastService.java membership/McastServiceImpl.java
Author: fhanik
Date: Wed May 3 11:48:01 2006
New Revision: 399382
URL: http://svn.apache.org/viewcvs?rev=399382&view=rev
Log:
Start levels are now respected correctly, this will make unit testing easier since we can shutdown and start different components to simulate errors
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Channel.java Wed May 3 11:48:01 2006
@@ -30,9 +30,9 @@
* Start and stop sequences can be controlled by these constants
*/
public static final int DEFAULT = 15;
- public static final int MBR_RX_SEQ = 1;
+ public static final int SND_RX_SEQ = 1;
public static final int SND_TX_SEQ = 2;
- public static final int SND_RX_SEQ = 4;
+ public static final int MBR_RX_SEQ = 4;
public static final int MBR_TX_SEQ = 8;
/**
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java Wed May 3 11:48:01 2006
@@ -27,8 +27,8 @@
public interface MembershipService {
- public static final int MBR_RX = 1;
- public static final int MBR_TX = 2;
+ public static final int MBR_RX = Channel.MBR_RX_SEQ;
+ public static final int MBR_TX = Channel.MBR_TX_SEQ;
/**
* Sets the properties for the membership service. This must be called before
@@ -52,17 +52,24 @@
/**
* Starts the membership service. If a membership listeners is added
* the listener will start to receive membership events.
- * @param level - level 1 starts listening for members, level 2
+ * @param level - level MBR_RX starts listening for members, level MBR_TX
* starts broad casting the server
* @throws java.lang.Exception if the service fails to start.
+ * @throws java.lang.IllegalArgumentException if the level is incorrect.
*/
public void start(int level) throws java.lang.Exception;
/**
- * Stops the membership service
+ * Starts the membership service. If a membership listeners is added
+ * the listener will start to receive membership events.
+ * @param level - level MBR_RX stops listening for members, level MBR_TX
+ * stops broad casting the server
+ * @throws java.lang.Exception if the service fails to stop
+ * @throws java.lang.IllegalArgumentException if the level is incorrect.
*/
- public void stop();
+
+ public void stop(int level);
/**
* Returns that cluster has members.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Wed May 3 11:48:01 2006
@@ -41,10 +41,13 @@
private ChannelReceiver clusterReceiver = new NioReceiver();
private ChannelSender clusterSender = new ReplicationTransmitter();
private MembershipService membershipService = new McastService();
- private boolean started = false;
+ //override optionflag
protected int optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE|Channel.SEND_OPTIONS_USE_ACK|Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
+ public int getOptionFlag() {return optionFlag;}
+ public void setOptionFlag(int flag) {optionFlag=flag;}
+ private int startLevel = 0;
public ChannelCoordinator() {
@@ -70,7 +73,7 @@
if ( destination == null ) destination = membershipService.getMembers();
clusterSender.sendMessage(msg,destination);
}
-
+
/**
* Starts up the channel. This can be called multiple times for individual services to start
@@ -84,7 +87,7 @@
* @throws ChannelException if a startup error occurs or the service is already started.
*/
public void start(int svc) throws ChannelException {
- this.start();
+ this.internalStart(svc);
}
/**
@@ -99,7 +102,7 @@
* @throws ChannelException if a startup error occurs or the service is already started.
*/
public void stop(int svc) throws ChannelException {
- this.stop();
+ this.internalStop(svc);
}
@@ -114,20 +117,38 @@
* SND_RX_SEQ - starts the replication receiver<BR>
* @throws ChannelException if a startup error occurs or the service is already started.
*/
- protected synchronized void start() throws ChannelException {
+ protected synchronized void internalStart(int svc) throws ChannelException {
try {
- if (started) return;
+ boolean valid = false;
+ if (startLevel == Channel.DEFAULT) return;
+
//must start the receiver first so that we can coordinate the port it
//listens to with the local membership settings
- clusterReceiver.setMessageListener(this);
- clusterReceiver.start();
- clusterSender.start();
- //synchronize, big time FIXME
- membershipService.setMembershipListener(this);
- membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
- membershipService.start(MembershipService.MBR_RX);
- membershipService.start(MembershipService.MBR_TX);
- this.started = true;
+ if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
+ clusterReceiver.setMessageListener(this);
+ clusterReceiver.start();
+ //synchronize, big time FIXME
+ membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
+ valid = true;
+ }
+ if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
+ clusterSender.start();
+ valid = true;
+ }
+
+ if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
+ membershipService.setMembershipListener(this);
+ membershipService.start(MembershipService.MBR_RX);
+ valid = true;
+ }
+ if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
+ membershipService.start(MembershipService.MBR_TX);
+ valid = true;
+ }
+ if ( !valid) {
+ throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
+ }
+ startLevel = (startLevel | svc);
}catch ( ChannelException cx ) {
throw cx;
}catch ( Exception x ) {
@@ -146,17 +167,40 @@
* SND_RX_SEQ - starts the replication receiver<BR>
* @throws ChannelException if a startup error occurs or the service is already started.
*/
- protected synchronized void stop() throws ChannelException {
+ protected synchronized void internalStop(int svc) throws ChannelException {
try {
- if ( !started ) return;
- membershipService.stop();
- clusterReceiver.stop();
- clusterSender.stop();
- membershipService.stop();
+ boolean valid = false;
+ if ( startLevel == 0 ) return;
+ if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
+ clusterReceiver.stop();
+ clusterReceiver.setMessageListener(null);
+ valid = true;
+ }
+ if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
+ clusterSender.stop();
+ valid = true;
+ }
+
+ if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
+ membershipService.stop(MembershipService.MBR_RX);
+ membershipService.setMembershipListener(null);
+ valid = true;
+
+ }
+ if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
+ valid = true;
+ membershipService.stop(MembershipService.MBR_TX);
+ }
+ if ( !valid) {
+ throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
+ }
+
+ startLevel = (startLevel & (~svc));
+
}catch ( Exception x ) {
throw new ChannelException(x);
} finally {
- started = false;
+
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Wed May 3 11:48:01 2006
@@ -205,7 +205,7 @@
* @param name The property to check for
*/
protected void hasProperty(Properties properties, String name){
- if ( properties.getProperty(name)==null) throw new IllegalArgumentException("Required property \""+name+"\" is missing.");
+ if ( properties.getProperty(name)==null) throw new IllegalArgumentException("McastService:Required property \""+name+"\" is missing.");
}
/**
@@ -218,6 +218,14 @@
}
public void start(int level) throws java.lang.Exception {
+ hasProperty(properties,"mcastPort");
+ hasProperty(properties,"mcastAddress");
+ hasProperty(properties,"mcastClusterDomain");
+ hasProperty(properties,"memberDropTime");
+ hasProperty(properties,"mcastFrequency");
+ hasProperty(properties,"tcpListenPort");
+ hasProperty(properties,"tcpListenHost");
+
if ( impl != null ) {
impl.start(level);
return;
@@ -279,13 +287,12 @@
/**
* Stop broadcasting and listening to membership pings
*/
- public void stop() {
+ public void stop(int svc) {
try {
- if ( impl != null) impl.stop();
+ if ( impl != null && impl.stop(svc) ) impl = null;
} catch ( Exception x) {
- log.error("Unable to stop the mcast service.",x);
+ log.error("Unable to stop the mcast service, level:"+svc+".",x);
}
- impl = null;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399382&r1=399381&r2=399382&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Wed May 3 11:48:01 2006
@@ -26,6 +26,7 @@
import java.util.Arrays;
import java.net.SocketTimeoutException;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.Channel;
/**
* A <b>membership</b> implementation using simple multicast.
@@ -45,7 +46,9 @@
/**
* Internal flag used for the listen thread that listens to the multicasting socket.
*/
- protected boolean doRun = false;
+ protected boolean doRunSender = false;
+ protected boolean doRunReceiver = false;
+ protected int startLevel = 0;
/**
* Socket that we intend to listen to
*/
@@ -183,44 +186,70 @@
* @throws IllegalStateException if the service is already started
*/
public synchronized void start(int level) throws IOException {
- if ( sender != null && receiver != null ) throw new IllegalStateException("Service already running.");
- if ( level == 1 ) {
+ boolean valid = false;
+ if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
+ if ( receiver != null ) throw new IllegalStateException("McastService.receive already running.");
socket.joinGroup(address);
- doRun = true;
+ doRunReceiver = true;
receiver = new ReceiverThread();
receiver.setDaemon(true);
receiver.start();
- }
- if ( level==2 ) {
+ valid = true;
+ }
+ if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
+ if ( sender != null ) throw new IllegalStateException("McastService.send already running.");
+ doRunSender = true;
serviceStartTime = System.currentTimeMillis();
sender = new SenderThread(sendFrequency);
sender.setDaemon(true);
sender.start();
-
+ valid = true;
+ }
+ if (!valid) {
+ throw new IllegalArgumentException("Invalid start level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
}
+ startLevel = (startLevel | level);
}
/**
* Stops the service
* @throws IOException if the service fails to disconnect from the sockets
*/
- public synchronized void stop() throws IOException {
- doRun = false;
- sender.interrupt();
- receiver.interrupt();
- sender = null;
- receiver = null;
- //send a stop message
- byte[] payload = member.getPayload();
- member.setPayload(Member.SHUTDOWN_PAYLOAD);
- member.getData(true,true);
- send();
- //restore payload
- member.setPayload(payload);
- member.getData(true,true);
- //leave mcast group
- socket.leaveGroup(address);
- serviceStartTime = Long.MAX_VALUE;
+ public synchronized boolean stop(int level) throws IOException {
+ boolean valid = false;
+
+ if ( (level & Channel.MBR_RX_SEQ)==Channel.MBR_RX_SEQ ) {
+ valid = true;
+ doRunReceiver = false;
+ receiver.interrupt();
+ receiver = null;
+ }
+ if ( (level & Channel.MBR_TX_SEQ)==Channel.MBR_TX_SEQ ) {
+ valid = true;
+ doRunSender = false;
+ sender.interrupt();
+ sender = null;
+ }
+
+ if (!valid) {
+ throw new IllegalArgumentException("Invalid stop level. Only acceptable levels are Channel.MBR_RX_SEQ and Channel.MBR_TX_SEQ");
+ }
+ startLevel = (startLevel & (~level));
+ //we're shutting down, send a shutdown message and close the socket
+ if ( startLevel == 0 ) {
+ //send a stop message
+ byte[] payload = member.getPayload();
+ member.setPayload(Member.SHUTDOWN_PAYLOAD);
+ member.getData(true, true);
+ send();
+ //restore payload
+ member.setPayload(payload);
+ member.getData(true, true);
+ //leave mcast group
+ socket.leaveGroup(address);
+ serviceStartTime = Long.MAX_VALUE;
+ }
+ return (startLevel == 0);
}
/**
@@ -293,7 +322,7 @@
setName("Cluster-MembershipReceiver");
}
public void run() {
- while ( doRun ) {
+ while ( doRunReceiver ) {
try {
receive();
} catch ( Exception x ) {
@@ -313,7 +342,7 @@
}
public void run() {
- while ( doRun ) {
+ while ( doRunSender ) {
try {
send();
} catch ( Exception x ) {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org