You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/08 01:53:44 UTC
svn commit: r420015 - in /tomcat/container/tc5.5.x/modules/groupcom: ./
src/share/org/apache/catalina/tribes/group/
src/share/org/apache/catalina/tribes/membership/
Author: fhanik
Date: Fri Jul 7 16:53:44 2006
New Revision: 420015
URL: http://svn.apache.org/viewvc?rev=420015&view=rev
Log:
Fix in trace, and added a protocol around multicast packets as well
Modified:
tomcat/container/tc5.5.x/modules/groupcom/VERSION
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/VERSION?rev=420015&r1=420014&r2=420015&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/VERSION (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/VERSION Fri Jul 7 16:53:44 2006
@@ -1,3 +1,6 @@
+0.9.5.1
+ - create a protocol around the multicast packages, so that we can go to nio eventually
+ - corrected tracing
0.9.5.0
- added message tracers
0.9.4.9
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=420015&r1=420014&r2=420015&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Jul 7 16:53:44 2006
@@ -257,10 +257,12 @@
//get the actual member with the correct alive time
Member source = msg.getAddress();
boolean rx = false;
+ boolean delivered = false;
for ( int i=0; i<channelListeners.size(); i++ ) {
ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
if (channelListener != null && channelListener.accept(fwd, source)) {
channelListener.messageReceived(fwd, source);
+ delivered = true;
//if the message was accepted by an RPC channel, that channel
//is responsible for returning the reply, otherwise we send an absence reply
if ( channelListener instanceof RpcChannel ) rx = true;
@@ -272,7 +274,7 @@
sendNoRpcChannelReply((RpcMessage)fwd,source);
}
if ( Logs.MESSAGES.isTraceEnabled() ) {
- Logs.MESSAGES.trace("GroupChannel delivered["+rx+"] id:"+new UniqueId(msg.getUniqueId()));
+ Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
}
} catch ( Exception x ) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java?rev=420015&r1=420014&r2=420015&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/Constants.java Fri Jul 7 16:53:44 2006
@@ -17,6 +17,8 @@
package org.apache.catalina.tribes.membership;
+import org.apache.catalina.tribes.util.Arrays;
+
/**
* Manifest constants for the <code>org.apache.catalina.tribes.membership</code>
@@ -24,10 +26,14 @@
*
* @author Peter Rossbach
* @version $Revision: 303950 $ $Date: 2005-06-09 15:38:30 -0500 (Thu, 09 Jun 2005) $
+ * @author Filip Hanik
*/
public class Constants {
public static final String Package = "org.apache.catalina.tribes.membership";
-
+ public static void main(String[] args) throws Exception {
+ System.out.println(Arrays.toString("TRIBES-B".getBytes()));
+ System.out.println(Arrays.toString("TRIBES-E".getBytes()));
+ }
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=420015&r1=420014&r2=420015&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java Fri Jul 7 16:53:44 2006
@@ -338,8 +338,8 @@
//ignore if we haven't started the sender
//if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return;
member.inc();
- if(log.isDebugEnabled())
- log.debug("Mcast send ping from member " + member);
+ if(log.isTraceEnabled())
+ log.trace("Mcast send ping from member " + member);
byte[] data = member.getData();
DatagramPacket p = new DatagramPacket(data,data.length);
p.setAddress(address);
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java?rev=420015&r1=420014&r2=420015&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/MemberImpl.java Fri Jul 7 16:53:44 2006
@@ -42,6 +42,9 @@
public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
public static final transient String MEMBER_NAME = "memberName";
+ public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66};
+ public static final transient byte[] TRIBES_MBR_END = new byte[] {84, 82, 73, 66, 69, 83, 45, 69};
+
/**
* The listen host for this member
*/
@@ -163,7 +166,20 @@
public int getDataLength() {
- return 8+4+1+host.length+4+command.length+4+domain.length+16+4+payload.length;
+ return TRIBES_MBR_BEGIN.length+ //start pkg
+ 4+ //data length
+ 8+ //alive time
+ 4+ //port
+ 1+ //host length
+ host.length+ //host
+ 4+ //command length
+ command.length+ //command
+ 4+ //domain length
+ domain.length+ //domain
+ 16+ //unique id
+ 4+ //payload length
+ payload.length+ //payload
+ TRIBES_MBR_END.length; //end pkg
}
/**
@@ -180,12 +196,14 @@
//you'd be surprised, but System.currentTimeMillis
//shows up on the profiler
long alive=System.currentTimeMillis()-getServiceStartTime();
- XByteBuffer.toBytes( (long) alive, dataPkg, 0);
+ XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4);
}
return dataPkg;
}
//package looks like
+ //start package TRIBES_MBR_BEGIN.length
+ //package length - 4 bytes
//alive - 8 bytes
//port - 4 bytes
//host length - 1 byte
@@ -197,13 +215,26 @@
//uniqueId - 16 bytes
//payload length - 4 bytes
//payload plen bytes
+ //end package TRIBES_MBR_END.length
byte[] addr = host;
long alive=System.currentTimeMillis()-getServiceStartTime();
byte hl = (byte)addr.length;
byte[] data = new byte[getDataLength()];
+
+ int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4);
+
int pos = 0;
+
+ //TRIBES_MBR_BEGIN
+ System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length);
+ pos += TRIBES_MBR_BEGIN.length;
+
+ //body length
+ XByteBuffer.toBytes(bodylength,data,pos);
+ pos += 4;
+
//alive data
- XByteBuffer.toBytes((long)alive,data,0);
+ XByteBuffer.toBytes((long)alive,data,pos);
pos += 8;
//port
XByteBuffer.toBytes(port,data,pos);
@@ -233,6 +264,10 @@
pos+=4;
System.arraycopy(payload,0,data,pos,payload.length);
pos+=payload.length;
+
+ //TRIBES_MBR_END
+ System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length);
+ pos += TRIBES_MBR_END.length;
//create local data
dataPkg = data;
@@ -248,69 +283,96 @@
}
public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) {
- //package looks like
- //alive - 8 bytes
- //port - 4 bytes
- //host length - 1 byte
- //host - hl bytes
- //command length - 4 bytes
- //command clen bytes
- //domain length - 4 bytes
- //domain - dlen bytes
- //uniqueId - 16 bytes
- //payload length - 4bytes
- //payload - pl bytes
- int pos = offset;
-
- byte[] alived = new byte[8];
- System.arraycopy(data, pos, alived, 0, 8);
- pos+=8;
- byte[] portd = new byte[4];
- System.arraycopy(data, pos, portd, 0, 4);
- pos+=4;
-
- byte hl = data[pos++];
- byte[] addr = new byte[hl];
- System.arraycopy(data, pos, addr, 0, hl);
- pos+=hl;
-
- int cl = XByteBuffer.toInt(data,pos);
- pos+=4;
-
- byte[] command = new byte[cl];
- System.arraycopy(data, pos, command, 0, command.length);
- pos+=command.length;
-
- int dl = XByteBuffer.toInt(data,pos);
- pos+=4;
-
- byte[] domain = new byte[dl];
- System.arraycopy(data, pos, domain, 0, domain.length);
- pos+=domain.length;
-
- byte[] uniqueId = new byte[16];
- System.arraycopy(data, pos, uniqueId, 0, 16);
- pos+=16;
-
- int pl = XByteBuffer.toInt(data,pos);
- pos+=4;
-
- byte[] payload = new byte[pl];
- System.arraycopy(data, pos, payload, 0, payload.length);
- pos+=payload.length;
-
- member.setHost(addr);
- member.setPort(XByteBuffer.toInt(portd, 0));
- member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
- member.setUniqueId(uniqueId);
- member.payload = payload;
- member.domain = domain;
- member.command = command;
-
- member.dataPkg = new byte[length];
- System.arraycopy(data,offset,member.dataPkg,0,length);
-
- return member;
+ //package looks like
+ //start package TRIBES_MBR_BEGIN.length
+ //package length - 4 bytes
+ //alive - 8 bytes
+ //port - 4 bytes
+ //host length - 1 byte
+ //host - hl bytes
+ //clen - 4 bytes
+ //command - clen bytes
+ //dlen - 4 bytes
+ //domain - dlen bytes
+ //uniqueId - 16 bytes
+ //payload length - 4 bytes
+ //payload plen bytes
+ //end package TRIBES_MBR_END.length
+
+ int pos = offset;
+
+ if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) {
+ throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN));
+ }
+
+ if ( length < (TRIBES_MBR_BEGIN.length+4) ) {
+ throw new ArrayIndexOutOfBoundsException("Member package to small to validate.");
+ }
+
+ pos += TRIBES_MBR_BEGIN.length;
+
+ int bodylength = XByteBuffer.toInt(data,pos);
+ pos += 4;
+
+ if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) {
+ throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package.");
+ }
+
+ int endpos = pos+bodylength;
+ if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) {
+ throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END));
+ }
+
+
+ byte[] alived = new byte[8];
+ System.arraycopy(data, pos, alived, 0, 8);
+ pos += 8;
+ byte[] portd = new byte[4];
+ System.arraycopy(data, pos, portd, 0, 4);
+ pos += 4;
+
+ byte hl = data[pos++];
+ byte[] addr = new byte[hl];
+ System.arraycopy(data, pos, addr, 0, hl);
+ pos += hl;
+
+ int cl = XByteBuffer.toInt(data, pos);
+ pos += 4;
+
+ byte[] command = new byte[cl];
+ System.arraycopy(data, pos, command, 0, command.length);
+ pos += command.length;
+
+ int dl = XByteBuffer.toInt(data, pos);
+ pos += 4;
+
+ byte[] domain = new byte[dl];
+ System.arraycopy(data, pos, domain, 0, domain.length);
+ pos += domain.length;
+
+ byte[] uniqueId = new byte[16];
+ System.arraycopy(data, pos, uniqueId, 0, 16);
+ pos += 16;
+
+ int pl = XByteBuffer.toInt(data, pos);
+ pos += 4;
+
+ byte[] payload = new byte[pl];
+ System.arraycopy(data, pos, payload, 0, payload.length);
+ pos += payload.length;
+
+ member.setHost(addr);
+ member.setPort(XByteBuffer.toInt(portd, 0));
+ member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
+ member.setUniqueId(uniqueId);
+ member.payload = payload;
+ member.domain = domain;
+ member.command = command;
+
+ member.dataPkg = new byte[length];
+ System.arraycopy(data, offset, member.dataPkg, 0, length);
+
+ return member;
}
public static MemberImpl getMember(byte[] data) {
@@ -414,7 +476,7 @@
for (int i=0; data!=null && i<data.length; i++ ) {
buf.append(String.valueOf(data[i])).append(" ");
if ( i==max ) {
- buf.append("...");
+ buf.append("...("+data.length+")");
break;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org