You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/02 17:31:53 UTC
svn commit: r398964 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
./ membership/
Author: fhanik
Date: Tue May 2 08:31:11 2006
New Revision: 398964
URL: http://svn.apache.org/viewcvs?rev=398964&view=rev
Log:
Added in a stop message for a soft shutdown
Added in the ability to send a payload with each broadcast package
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java Tue May 2 08:31:11 2006
@@ -73,4 +73,10 @@
*/
public byte[] getUniqueId();
+ /**
+ * returns the payload associated with this member
+ * @return byte[]
+ */
+ public byte[] getPayload();
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/MembershipService.java Tue May 2 08:31:11 2006
@@ -108,5 +108,12 @@
* removes the membership listener.
*/
public void removeMembershipListener();
+
+ /**
+ * Set a payload to be broadcasted with each membership
+ * broadcast.
+ * @param payload byte[]
+ */
+ public void setPayload(byte[] payload);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastMembership.java Tue May 2 08:31:11 2006
@@ -134,7 +134,7 @@
synchronized (members) {
int n = -1;
for (int i = 0; i < members.length; i++) {
- if (members[i] == member) {
+ if (members[i] == member || members[i].equals(member)) {
n = i;
break;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastService.java Tue May 2 08:31:11 2006
@@ -68,6 +68,8 @@
protected MemberImpl localMember ;
private int mcastSoTimeout;
private int mcastTTL;
+
+ protected byte[] payload;
/**
* Create a membership service.
@@ -232,6 +234,7 @@
localMember.setPort(port);
localMember.setMemberAliveTime(100);
}
+ if ( this.payload != null ) localMember.setPayload(payload);
localMember.setUniqueId(UUIDGenerator.randomUUID(true));
localMember.setServiceStartTime(System.currentTimeMillis());
java.net.InetAddress bind = null;
@@ -372,9 +375,18 @@
public int getMcastTTL() {
return mcastTTL;
}
+
+ public byte[] getPayload() {
+ return payload;
+ }
+
public void setMcastTTL(int mcastTTL) {
this.mcastTTL = mcastTTL;
properties.setProperty("mcastTTL", String.valueOf(mcastTTL));
+ }
+
+ public void setPayload(byte[] payload) {
+ this.payload = payload;
}
/**
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue May 2 08:31:11 2006
@@ -23,6 +23,8 @@
import java.net.MulticastSocket;
import org.apache.catalina.tribes.MembershipListener;
+import java.util.Arrays;
+import java.net.SocketTimeoutException;
/**
* A <b>membership</b> implementation using simple multicast.
@@ -97,9 +99,20 @@
*/
protected long serviceStartTime = System.currentTimeMillis();
+ /**
+ * Time to live for the multicast packets that are being sent out
+ */
protected int mcastTTL = -1;
+ /**
+ * Read timeout on the mcast socket
+ */
protected int mcastSoTimeout = -1;
+ /**
+ * bind address
+ */
protected InetAddress mcastBindAddress = null;
+
+ protected static final byte[] STOP_PAYLOAD = new byte[] {66, 65, 66, 89, 45, 65, 76, 69, 88};
/**
* Create a new mcast service impl
@@ -130,10 +143,10 @@
this.mcastTTL = ttl;
this.mcastBindAddress = bind;
setupSocket();
- sendPacket = new DatagramPacket(new byte[1000],1000);
+ sendPacket = new DatagramPacket(new byte[1024],1024);
sendPacket.setAddress(address);
sendPacket.setPort(port);
- receivePacket = new DatagramPacket(new byte[1000],1000);
+ receivePacket = new DatagramPacket(new byte[1024],1024);
receivePacket.setAddress(address);
receivePacket.setPort(port);
membership = new McastMembership(member);
@@ -193,10 +206,18 @@
* @throws IOException if the service fails to disconnect from the sockets
*/
public synchronized void stop() throws IOException {
- socket.leaveGroup(address);
doRun = false;
+ sender.interrupt();
+ receiver.interrupt();
sender = null;
receiver = null;
+ //send a stop message
+ byte[] payload = member.getPayload();
+ member.setPayload(STOP_PAYLOAD);
+ send();
+ //restore payload
+ member.setPayload(payload);
+ socket.leaveGroup(address);
serviceStartTime = Long.MAX_VALUE;
}
@@ -205,23 +226,42 @@
* @throws IOException
*/
public void receive() throws IOException {
- socket.receive(receivePacket);
- byte[] data = new byte[receivePacket.getLength()];
- System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length);
- MemberImpl m = MemberImpl.getMember(data);
- if(log.isDebugEnabled())
- log.debug("Mcast receive ping from member " + m);
+ checkExpired();
+ try {
+ socket.receive(receivePacket);
+ byte[] data = new byte[receivePacket.getLength()];
+ System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length);
+ MemberImpl m = MemberImpl.getMember(data);
+ if (log.isDebugEnabled())
+ log.debug("Mcast receive ping from member " + m);
- if ( membership.memberAlive(m) ) {
- if(log.isDebugEnabled())
- log.debug("Mcast add member " + m);
- service.memberAdded(m);
+ if (Arrays.equals(m.getPayload(), STOP_PAYLOAD)) {
+ if (log.isDebugEnabled())
+ log.debug("Mcast member shutdown" + m);
+ membership.removeMcastMember(m);
+ service.memberDisappeared(m);
+ } else if (membership.memberAlive(m)) {
+ if (log.isDebugEnabled())
+ log.debug("Mcast add member " + m);
+ service.memberAdded(m);
+ } //end if
+ } catch (SocketTimeoutException x ) {
+ //do nothing, this is normal, we don't want to block forever
+ //since the receive thread is the same thread
+ //that does membership expiration
}
+ }
+
+ protected synchronized void checkExpired() {
MemberImpl[] expired = membership.expire(timeToExpiration);
for ( int i=0; i<expired.length; i++) {
if(log.isDebugEnabled())
- log.debug("Mcast exipre member " + m);
- service.memberDisappeared(expired[i]);
+ log.debug("Mcast exipre member " + expired[i]);
+ try {
+ service.memberDisappeared(expired[i]);
+ }catch ( Exception x ) {
+ log.error("Unable to process member disappeared message.",x);
+ }
}
}
@@ -229,7 +269,7 @@
* Send a ping
* @throws Exception
*/
- public void send() throws Exception{
+ public void send() throws IOException{
member.inc();
if(log.isDebugEnabled())
log.debug("Mcast send ping from member " + member);
@@ -238,6 +278,7 @@
p.setAddress(address);
p.setPort(port);
socket.send(p);
+ checkExpired();
}
public long getServiceStartTime() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=398964&r1=398963&r2=398964&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Tue May 2 08:31:11 2006
@@ -83,8 +83,22 @@
*/
protected transient long serviceStartTime;
+ /**
+ * To avoid serialization over and over again, once the local dataPkg
+ * has been set, we use that to transmit data
+ */
protected transient byte[] dataPkg = null;
- private byte[] uniqueId = new byte[16];
+
+ /**
+ * Unique session Id for this member
+ */
+ protected byte[] uniqueId = new byte[16];
+
+ /**
+ * Custom payload that an app framework can broadcast
+ * Also used to transport stop command.
+ */
+ protected byte[] payload = new byte[0];
/**
* Empty constructor for serialization
@@ -178,13 +192,15 @@
//dlen - 4 bytes
//domain - dlen bytes
//uniqueId - 16 bytes
+ //payload length - 4 bytes
+ //payload plen bytes
byte[] domaind = this.domain;
byte[] addr = host;
long alive=System.currentTimeMillis()-getServiceStartTime();
byte hl = (byte)addr.length;
- byte[] data = new byte[8+4+1+addr.length+4+domaind.length+16];
+ byte[] data = new byte[8+4+1+addr.length+4+domaind.length+16+4+payload.length];
int pos = 0;
//alive data
XByteBuffer.toBytes((long)alive,data,0);
@@ -205,6 +221,14 @@
pos+=domaind.length;
//unique Id
System.arraycopy(uniqueId,0,data,pos,uniqueId.length);
+ pos+=uniqueId.length;
+ //payload
+ XByteBuffer.toBytes(payload.length,data,pos);
+ pos+=4;
+ System.arraycopy(payload,0,data,pos,payload.length);
+ pos+=payload.length;
+
+ //create local data
dataPkg = data;
return data;
}
@@ -247,12 +271,21 @@
byte[] uniqueId = new byte[16];
System.arraycopy(data, pos, uniqueId, 0, 16);
+ pos+=16;
+
+ int pl = XByteBuffer.toInt(data,pos);
+ pos+=4;
+
+ byte[] payload = new byte[pl];
+ System.arraycopy(data, pos, payload, 0, payload.length);
+ pos+=payload.length;
member.domain = domaind;
member.setHost(addr);
member.setPort(XByteBuffer.toInt(portd, 0));
member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
member.setUniqueId(uniqueId);
+ member.payload = payload;
member.dataPkg = new byte[data.length];
System.arraycopy(data,0,member.dataPkg,0,data.length);
@@ -327,6 +360,10 @@
return uniqueId;
}
+ public byte[] getPayload() {
+ return payload;
+ }
+
public void setMemberAliveTime(long time) {
memberAliveTime=time;
}
@@ -343,7 +380,8 @@
buf.append(getHostname()).append(",");
buf.append(port).append(", alive=");
buf.append(memberAliveTime).append(",");
- buf.append("id=").append(bToS(this.uniqueId));
+ buf.append("id=").append(bToS(this.uniqueId)).append(", ");
+ buf.append("payload=").append(bToS(this.payload)).append(", ");
buf.append("]");
return buf.toString();
}
@@ -466,6 +504,10 @@
public void setUniqueId(byte[] uniqueId) {
this.uniqueId = uniqueId;
+ }
+
+ public void setPayload(byte[] payload) {
+ this.payload = payload;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org