You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/09/08 17:42:10 UTC

svn commit: r441543 [1/2] - in /tomcat/tc6.0.x/trunk: java/org/apache/catalina/tribes/ java/org/apache/catalina/tribes/membership/ java/org/apache/catalina/tribes/transport/ test/ test/org/ test/org/apache/ test/org/apache/catalina/ test/org/apache/cat...

Author: fhanik
Date: Fri Sep  8 08:42:08 2006
New Revision: 441543

URL: http://svn.apache.org/viewvc?view=rev&rev=441543
Log:
Adding in unit tests, yell at me if the location should be elsewhere


Added:
    tomcat/tc6.0.x/trunk/test/
    tomcat/tc6.0.x/trunk/test/org/
    tomcat/tc6.0.x/trunk/test/org/apache/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TestNioSender.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TribesTestSuite.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSerialization.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/MemberSerialization.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/TestDomainFilter.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/TestMemberArrival.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/TestTcpFailureDetector.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/SocketNioReceive.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/SocketNioSend.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/SocketNioValidateSend.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/SocketReceive.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/SocketSend.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/SocketTribesReceive.java
    tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/transport/SocketValidateReceive.java
Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Member.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java?view=diff&rev=441543&r1=441542&r2=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/ChannelReceiver.java Fri Sep  8 08:42:08 2006
@@ -52,6 +52,12 @@
     public int getPort();
     
     /**
+     * Returns the secure listening port
+     * @return port, -1 if a secure port is not activated
+     */
+    public int getSecurePort();
+    
+    /**
      * Sets the message listener to receive notification of incoming
      * @param listener MessageListener
      * @see MessageListener

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Member.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Member.java?view=diff&rev=441543&r1=441542&r2=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Member.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/Member.java Fri Sep  8 08:42:08 2006
@@ -52,10 +52,19 @@
 
     /**
      * Returns the listen port for the ChannelReceiver implementation
-     * @return IPv4 or IPv6 representation of the host address this member listens to incoming data
+     * @return the listen port for this member, -1 if its not listening on an unsecure port
      * @see ChannelReceiver
      */
     public int getPort();
+    
+    /**
+     * Returns the secure listen port for the ChannelReceiver implementation.
+     * Returns -1 if its not listening to a secure port.
+     * @return the listen port for this member, -1 if its not listening on a secure port
+     * @see ChannelReceiver
+     */
+    public int getSecurePort();
+    
 
     /**
      * Contains information on how long this member has been online.

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java?view=diff&rev=441543&r1=441542&r2=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java Fri Sep  8 08:42:08 2006
@@ -1,570 +1,596 @@
-/*
- * Copyright 1999,2004-2005 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.membership;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Arrays;
-
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.transport.SenderState;
-
-/**
- * A <b>membership</b> implementation using simple multicast.
- * This is the representation of a multicast member.
- * Carries the host, and port of the this or other cluster nodes.
- *
- * @author Filip Hanik
- * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
- */
-public class MemberImpl implements Member, java.io.Externalizable {
-
-    /**
-     * Public properties specific to this implementation
-     */
-    public static final transient String TCP_LISTEN_PORT = "tcpListenPort";
-    public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
-    public static final transient String MEMBER_NAME = "memberName";
-    
-    public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66};
-    public static final transient byte[] TRIBES_MBR_END   = new byte[] {84, 82, 73, 66, 69, 83, 45, 69};
-    
-    /**
-     * The listen host for this member
-     */
-    protected byte[] host;
-    protected transient String hostname;
-    /**
-     * The tcp listen port for this member
-     */
-    protected int port;
-
-    /**
-     * Counter for how many broadcast messages have been sent from this member
-     */
-    protected int msgCount = 0;
-    /**
-     * The number of milliseconds since this members was
-     * created, is kept track of using the start time
-     */
-    protected long memberAliveTime = 0;
-    
-    /**
-     * For the local member only
-     */
-    protected transient long serviceStartTime;
-    
-    /**
-     * To avoid serialization over and over again, once the local dataPkg
-     * has been set, we use that to transmit data
-     */
-    protected transient byte[] dataPkg = null;
-
-    /**
-     * Unique session Id for this member
-     */
-    protected byte[] uniqueId = new byte[16];
-    
-    /**
-     * Custom payload that an app framework can broadcast
-     * Also used to transport stop command.
-     */
-    protected byte[] payload = new byte[0];
-    
-    /**
-     * Command, so that the custom payload doesn't have to be used
-     * This is for internal tribes use, such as SHUTDOWN_COMMAND
-     */
-    protected byte[] command = new byte[0];
-
-    /**
-     * Domain if we want to filter based on domain.
-     */
-    protected byte[] domain = new byte[0];
-    
-    /**
-     * Empty constructor for serialization
-     */
-    public MemberImpl() {
-        
-    }
-
-    /**
-     * Construct a new member object
-     * @param name - the name of this member, cluster unique
-     * @param domain - the cluster domain name of this member
-     * @param host - the tcp listen host
-     * @param port - the tcp listen port
-     */
-    public MemberImpl(String host,
-                      int port,
-                      long aliveTime) throws IOException {
-        setHostname(host);
-        this.port = port;
-        this.memberAliveTime=aliveTime;
-    }
-    
-    public MemberImpl(String host,
-                      int port,
-                      long aliveTime,
-                      byte[] payload) throws IOException {
-        this(host,port,aliveTime);
-        setPayload(payload);
-    }
-    
-    public boolean isReady() {
-        return SenderState.getSenderState(this).isReady();
-    }
-    public boolean isSuspect() {
-        return SenderState.getSenderState(this).isSuspect();
-    }
-    public boolean isFailing() {
-        return SenderState.getSenderState(this).isFailing();
-    }
-
-    /**
-     * Increment the message count.
-     */
-    protected void inc() {
-        msgCount++;
-    }
-
-    /**
-     * Create a data package to send over the wire representing this member.
-     * This is faster than serialization.
-     * @return - the bytes for this member deserialized
-     * @throws Exception
-     */
-    public byte[] getData()  {
-        return getData(true);
-    }
-    /**
-     * Highly optimized version of serializing a member into a byte array
-     * Returns a cached byte[] reference, do not modify this data
-     * @param getalive boolean
-     * @return byte[]
-     */
-    public byte[] getData(boolean getalive)  {
-        return getData(getalive,false);
-    }
-    
-    
-    public int getDataLength() {
-        return TRIBES_MBR_BEGIN.length+ //start pkg
-               4+ //data length
-               8+ //alive time
-               4+ //port
-               1+ //host length
-               host.length+ //host
-               4+ //command length
-               command.length+ //command
-               4+ //domain length
-               domain.length+ //domain
-               16+ //unique id
-               4+ //payload length
-               payload.length+ //payload
-               TRIBES_MBR_END.length; //end pkg
-    }
-    
-    /**
-     * 
-     * @param getalive boolean - calculate memberAlive time
-     * @param reset boolean - reset the cached data package, and create a new one
-     * @return byte[]
-     */
-    public byte[] getData(boolean getalive, boolean reset)  {
-        if ( reset ) dataPkg = null;
-        //look in cache first
-        if ( dataPkg!=null ) {
-            if ( getalive ) {
-                //you'd be surprised, but System.currentTimeMillis
-                //shows up on the profiler
-                long alive=System.currentTimeMillis()-getServiceStartTime();
-                XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4);
-            }
-            return dataPkg;
-        }
-        
-        //package looks like
-        //start package TRIBES_MBR_BEGIN.length
-        //package length - 4 bytes
-        //alive - 8 bytes
-        //port - 4 bytes
-        //host length - 1 byte
-        //host - hl bytes
-        //clen - 4 bytes
-        //command - clen bytes
-        //dlen - 4 bytes
-        //domain - dlen bytes
-        //uniqueId - 16 bytes
-        //payload length - 4 bytes
-        //payload plen bytes
-        //end package TRIBES_MBR_END.length
-        byte[] addr = host;
-        long alive=System.currentTimeMillis()-getServiceStartTime();
-        byte hl = (byte)addr.length;
-        byte[] data = new byte[getDataLength()];
-        
-        int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4);
-        
-        int pos = 0;
-        
-        //TRIBES_MBR_BEGIN
-        System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length);
-        pos += TRIBES_MBR_BEGIN.length;
-        
-        //body length
-        XByteBuffer.toBytes(bodylength,data,pos);
-        pos += 4;
-        
-        //alive data
-        XByteBuffer.toBytes((long)alive,data,pos);
-        pos += 8;
-        //port
-        XByteBuffer.toBytes(port,data,pos);
-        pos += 4;
-        //host length
-        data[pos++] = hl;
-        //host
-        System.arraycopy(addr,0,data,pos,addr.length);
-        pos+=addr.length;
-        //clen - 4 bytes
-        XByteBuffer.toBytes(command.length,data,pos);
-        pos+=4;
-        //command - clen bytes
-        System.arraycopy(command,0,data,pos,command.length);
-        pos+=command.length;
-        //dlen - 4 bytes
-        XByteBuffer.toBytes(domain.length,data,pos);
-        pos+=4;
-        //domain - dlen bytes
-        System.arraycopy(domain,0,data,pos,domain.length);
-        pos+=domain.length;
-        //unique Id
-        System.arraycopy(uniqueId,0,data,pos,uniqueId.length);
-        pos+=uniqueId.length;
-        //payload
-        XByteBuffer.toBytes(payload.length,data,pos);
-        pos+=4;
-        System.arraycopy(payload,0,data,pos,payload.length);
-        pos+=payload.length;
-        
-        //TRIBES_MBR_END
-        System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length);
-        pos += TRIBES_MBR_END.length;
-
-        //create local data
-        dataPkg = data;
-        return data;
-    }
-    /**
-     * Deserializes a member from data sent over the wire
-     * @param data - the bytes received
-     * @return a member object.
-     */
-    public static MemberImpl getMember(byte[] data, MemberImpl member) {
-        return getMember(data,0,data.length,member);
-    }
-
-    public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) {
-        //package looks like
-        //start package TRIBES_MBR_BEGIN.length
-        //package length - 4 bytes
-        //alive - 8 bytes
-        //port - 4 bytes
-        //host length - 1 byte
-        //host - hl bytes
-        //clen - 4 bytes
-        //command - clen bytes
-        //dlen - 4 bytes
-        //domain - dlen bytes
-        //uniqueId - 16 bytes
-        //payload length - 4 bytes
-        //payload plen bytes
-        //end package TRIBES_MBR_END.length
-
-        int pos = offset;
-        
-        if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) {
-            throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN));
-        }
-
-        if ( length < (TRIBES_MBR_BEGIN.length+4) ) {
-            throw new ArrayIndexOutOfBoundsException("Member package to small to validate.");
-        }
-        
-        pos += TRIBES_MBR_BEGIN.length;
-        
-        int bodylength = XByteBuffer.toInt(data,pos);
-        pos += 4;
-        
-        if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) {
-            throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package.");
-        }
-        
-        int endpos = pos+bodylength;
-        if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) {
-            throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END));
-        }
-
-
-        byte[] alived = new byte[8];
-        System.arraycopy(data, pos, alived, 0, 8);
-        pos += 8;
-        byte[] portd = new byte[4];
-        System.arraycopy(data, pos, portd, 0, 4);
-        pos += 4;
-    
-        byte hl = data[pos++];
-        byte[] addr = new byte[hl];
-        System.arraycopy(data, pos, addr, 0, hl);
-        pos += hl;
-    
-        int cl = XByteBuffer.toInt(data, pos);
-        pos += 4;
-    
-        byte[] command = new byte[cl];
-        System.arraycopy(data, pos, command, 0, command.length);
-        pos += command.length;
-    
-        int dl = XByteBuffer.toInt(data, pos);
-        pos += 4;
-    
-        byte[] domain = new byte[dl];
-        System.arraycopy(data, pos, domain, 0, domain.length);
-        pos += domain.length;
-    
-        byte[] uniqueId = new byte[16];
-        System.arraycopy(data, pos, uniqueId, 0, 16);
-        pos += 16;
-    
-        int pl = XByteBuffer.toInt(data, pos);
-        pos += 4;
-    
-        byte[] payload = new byte[pl];
-        System.arraycopy(data, pos, payload, 0, payload.length);
-        pos += payload.length;
-    
-        member.setHost(addr);
-        member.setPort(XByteBuffer.toInt(portd, 0));
-        member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
-        member.setUniqueId(uniqueId);
-        member.payload = payload;
-        member.domain = domain;
-        member.command = command;
-    
-        member.dataPkg = new byte[length];
-        System.arraycopy(data, offset, member.dataPkg, 0, length);
-    
-        return member;
-    }
-
-    public static MemberImpl getMember(byte[] data) {
-       return getMember(data,new MemberImpl());
-    }
-
-    /**
-     * Return the name of this object
-     * @return a unique name to the cluster
-     */
-    public String getName() {
-        return "tcp://"+getHostname()+":"+getPort();
-    }
-    
-    /**
-     * Return the listen port of this member
-     * @return - tcp listen port
-     */
-    public int getPort()  {
-        return this.port;
-    }
-
-    /**
-     * Return the TCP listen host for this member
-     * @return IP address or host name
-     */
-    public byte[] getHost()  {
-        return host;
-    }
-    
-    public String getHostname() {
-        if ( this.hostname != null ) return hostname;
-        else {
-            try {
-                this.hostname = java.net.InetAddress.getByAddress(host).getHostName();
-                return this.hostname;
-            }catch ( IOException x ) {
-                throw new RuntimeException("Unable to parse hostname.",x);
-            }
-        }
-    }
-
-    /**
-     * Contains information on how long this member has been online.
-     * The result is the number of milli seconds this member has been
-     * broadcasting its membership to the cluster.
-     * @return nr of milliseconds since this member started.
-     */
-    public long getMemberAliveTime() {
-       return memberAliveTime;
-    }
-
-    public long getServiceStartTime() {
-        return serviceStartTime;
-    }
-
-    public byte[] getUniqueId() {
-        return uniqueId;
-    }
-
-    public byte[] getPayload() {
-        return payload;
-    }
-
-    public byte[] getCommand() {
-        return command;
-    }
-
-    public byte[] getDomain() {
-        return domain;
-    }
-
-    public void setMemberAliveTime(long time) {
-       memberAliveTime=time;
-    }
-
-
-
-    /**
-     * String representation of this object
-     */
-    public String toString()  {
-        StringBuffer buf = new StringBuffer("org.apache.catalina.tribes.membership.MemberImpl[");
-        buf.append(getName()).append(",");
-        buf.append(getHostname()).append(",");
-        buf.append(port).append(", alive=");
-        buf.append(memberAliveTime).append(",");
-        buf.append("id=").append(bToS(this.uniqueId)).append(", ");
-        buf.append("payload=").append(bToS(this.payload,8)).append(", ");
-        buf.append("command=").append(bToS(this.command,8)).append(", ");
-        buf.append("domain=").append(bToS(this.domain,8)).append(", ");
-        buf.append("]");
-        return buf.toString();
-    }
-    public static String bToS(byte[] data) {
-        return bToS(data,data.length);
-    }
-    public static String bToS(byte[] data, int max) {
-        StringBuffer buf = new StringBuffer(4*16);
-        buf.append("{");
-        for (int i=0; data!=null && i<data.length; i++ ) {
-            buf.append(String.valueOf(data[i])).append(" ");
-            if ( i==max ) {
-                buf.append("...("+data.length+")");
-                break;
-            }
-        }
-        buf.append("}");
-        return buf.toString();
-    }
-
-    /**
-     * @see java.lang.Object#hashCode()
-     * @return The hash code
-     */
-    public int hashCode() {
-        return getHost()[0]+getHost()[1]+getHost()[2]+getHost()[3];
-    }
-
-    /**
-     * Returns true if the param o is a McastMember with the same name
-     * @param o
-     */
-    public boolean equals(Object o) {
-        if ( o instanceof MemberImpl )    {
-            return Arrays.equals(this.getHost(),((MemberImpl)o).getHost()) &&
-                   this.getPort() == ((MemberImpl)o).getPort() &&
-                   Arrays.equals(this.getUniqueId(),((MemberImpl)o).getUniqueId());
-        }
-        else
-            return false;
-    }
-    
-    public void setHost(byte[] host) {
-        this.host = host;
-    }
-    
-    public void setHostname(String host) throws IOException {
-        hostname = host;
-        this.host = java.net.InetAddress.getByName(host).getAddress();
-    }
-    
-    public void setMsgCount(int msgCount) {
-        this.msgCount = msgCount;
-    }
-
-    public void setPort(int port) {
-        this.port = port;
-        this.dataPkg = null;
-    }
-
-    public void setServiceStartTime(long serviceStartTime) {
-        this.serviceStartTime = serviceStartTime;
-    }
-
-    public void setUniqueId(byte[] uniqueId) {
-        this.uniqueId = uniqueId!=null?uniqueId:new byte[16];
-        getData(true,true);
-    }
-
-    public void setPayload(byte[] payload) {
-        byte[] oldpayload = this.payload;
-        this.payload = payload!=null?payload:new byte[0];
-        if ( this.getData(true,true).length > McastServiceImpl.MAX_PACKET_SIZE ) {
-            this.payload = oldpayload;
-            throw new IllegalArgumentException("Payload is to large for tribes to handle.");
-        }
-        
-    }
-
-    public void setCommand(byte[] command) {
-        this.command = command!=null?command:new byte[0];
-        getData(true,true);
-    }
-
-    public void setDomain(byte[] domain) {
-        this.domain = domain!=null?domain:new byte[0];
-        getData(true,true);
-    }
-
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        int length = in.readInt();
-        byte[] message = new byte[length];
-        in.read(message);
-        getMember(message,this);
-        
-    }
-
-    public void writeExternal(ObjectOutput out) throws IOException {
-        byte[] data = this.getData();
-        out.writeInt(data.length);
-        out.write(data);
-    }
-    
-}
+/*
+ * Copyright 1999,2004-2005 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.transport.SenderState;
+
+/**
+ * A <b>membership</b> implementation using simple multicast.
+ * This is the representation of a multicast member.
+ * Carries the host, and port of the this or other cluster nodes.
+ *
+ * @author Filip Hanik
+ * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $
+ */
+public class MemberImpl implements Member, java.io.Externalizable {
+
+    /**
+     * Public properties specific to this implementation
+     */
+    public static final transient String TCP_LISTEN_PORT = "tcpListenPort";
+    public static final transient String TCP_LISTEN_HOST = "tcpListenHost";
+    public static final transient String MEMBER_NAME = "memberName";
+    
+    public static final transient byte[] TRIBES_MBR_BEGIN = new byte[] {84, 82, 73, 66, 69, 83, 45, 66};
+    public static final transient byte[] TRIBES_MBR_END   = new byte[] {84, 82, 73, 66, 69, 83, 45, 69};
+    
+    /**
+     * The listen host for this member
+     */
+    protected byte[] host;
+    protected transient String hostname;
+    /**
+     * The tcp listen port for this member
+     */
+    protected int port;
+    
+    /**
+     * The tcp/SSL listen port for this member
+     */
+    protected int securePort = -1;
+
+    /**
+     * Counter for how many broadcast messages have been sent from this member
+     */
+    protected int msgCount = 0;
+    /**
+     * The number of milliseconds since this members was
+     * created, is kept track of using the start time
+     */
+    protected long memberAliveTime = 0;
+    
+    /**
+     * For the local member only
+     */
+    protected transient long serviceStartTime;
+    
+    /**
+     * To avoid serialization over and over again, once the local dataPkg
+     * has been set, we use that to transmit data
+     */
+    protected transient byte[] dataPkg = null;
+
+    /**
+     * Unique session Id for this member
+     */
+    protected byte[] uniqueId = new byte[16];
+    
+    /**
+     * Custom payload that an app framework can broadcast
+     * Also used to transport stop command.
+     */
+    protected byte[] payload = new byte[0];
+    
+    /**
+     * Command, so that the custom payload doesn't have to be used
+     * This is for internal tribes use, such as SHUTDOWN_COMMAND
+     */
+    protected byte[] command = new byte[0];
+
+    /**
+     * Domain if we want to filter based on domain.
+     */
+    protected byte[] domain = new byte[0];
+    
+    /**
+     * Empty constructor for serialization
+     */
+    public MemberImpl() {
+        
+    }
+
+    /**
+     * Construct a new member object
+     * @param name - the name of this member, cluster unique
+     * @param domain - the cluster domain name of this member
+     * @param host - the tcp listen host
+     * @param port - the tcp listen port
+     */
+    public MemberImpl(String host,
+                      int port,
+                      long aliveTime) throws IOException {
+        setHostname(host);
+        this.port = port;
+        this.memberAliveTime=aliveTime;
+    }
+    
+    public MemberImpl(String host,
+                      int port,
+                      long aliveTime,
+                      byte[] payload) throws IOException {
+        this(host,port,aliveTime);
+        setPayload(payload);
+    }
+    
+    public boolean isReady() {
+        return SenderState.getSenderState(this).isReady();
+    }
+    public boolean isSuspect() {
+        return SenderState.getSenderState(this).isSuspect();
+    }
+    public boolean isFailing() {
+        return SenderState.getSenderState(this).isFailing();
+    }
+
+    /**
+     * Increment the message count.
+     */
+    protected void inc() {
+        msgCount++;
+    }
+
+    /**
+     * Create a data package to send over the wire representing this member.
+     * This is faster than serialization.
+     * @return - the bytes for this member deserialized
+     * @throws Exception
+     */
+    public byte[] getData()  {
+        return getData(true);
+    }
+    /**
+     * Highly optimized version of serializing a member into a byte array
+     * Returns a cached byte[] reference, do not modify this data
+     * @param getalive boolean
+     * @return byte[]
+     */
+    public byte[] getData(boolean getalive)  {
+        return getData(getalive,false);
+    }
+    
+    
+    public int getDataLength() {
+        return TRIBES_MBR_BEGIN.length+ //start pkg
+               4+ //data length
+               8+ //alive time
+               4+ //port
+               4+ //secure port
+               1+ //host length
+               host.length+ //host
+               4+ //command length
+               command.length+ //command
+               4+ //domain length
+               domain.length+ //domain
+               16+ //unique id
+               4+ //payload length
+               payload.length+ //payload
+               TRIBES_MBR_END.length; //end pkg
+    }
+    
+    /**
+     * 
+     * @param getalive boolean - calculate memberAlive time
+     * @param reset boolean - reset the cached data package, and create a new one
+     * @return byte[]
+     */
+    public byte[] getData(boolean getalive, boolean reset)  {
+        if ( reset ) dataPkg = null;
+        //look in cache first
+        if ( dataPkg!=null ) {
+            if ( getalive ) {
+                //you'd be surprised, but System.currentTimeMillis
+                //shows up on the profiler
+                long alive=System.currentTimeMillis()-getServiceStartTime();
+                XByteBuffer.toBytes( (long) alive, dataPkg, TRIBES_MBR_BEGIN.length+4);
+            }
+            return dataPkg;
+        }
+        
+        //package looks like
+        //start package TRIBES_MBR_BEGIN.length
+        //package length - 4 bytes
+        //alive - 8 bytes
+        //port - 4 bytes
+        //secure port - 4 bytes
+        //host length - 1 byte
+        //host - hl bytes
+        //clen - 4 bytes
+        //command - clen bytes
+        //dlen - 4 bytes
+        //domain - dlen bytes
+        //uniqueId - 16 bytes
+        //payload length - 4 bytes
+        //payload plen bytes
+        //end package TRIBES_MBR_END.length
+        byte[] addr = host;
+        long alive=System.currentTimeMillis()-getServiceStartTime();
+        byte hl = (byte)addr.length;
+        byte[] data = new byte[getDataLength()];
+        
+        int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length - TRIBES_MBR_END.length - 4);
+        
+        int pos = 0;
+        
+        //TRIBES_MBR_BEGIN
+        System.arraycopy(TRIBES_MBR_BEGIN,0,data,pos,TRIBES_MBR_BEGIN.length);
+        pos += TRIBES_MBR_BEGIN.length;
+        
+        //body length
+        XByteBuffer.toBytes(bodylength,data,pos);
+        pos += 4;
+        
+        //alive data
+        XByteBuffer.toBytes((long)alive,data,pos);
+        pos += 8;
+        //port
+        XByteBuffer.toBytes(port,data,pos);
+        pos += 4;
+        //secure port
+        XByteBuffer.toBytes(securePort,data,pos);
+        pos += 4;
+        //host length
+        data[pos++] = hl;
+        //host
+        System.arraycopy(addr,0,data,pos,addr.length);
+        pos+=addr.length;
+        //clen - 4 bytes
+        XByteBuffer.toBytes(command.length,data,pos);
+        pos+=4;
+        //command - clen bytes
+        System.arraycopy(command,0,data,pos,command.length);
+        pos+=command.length;
+        //dlen - 4 bytes
+        XByteBuffer.toBytes(domain.length,data,pos);
+        pos+=4;
+        //domain - dlen bytes
+        System.arraycopy(domain,0,data,pos,domain.length);
+        pos+=domain.length;
+        //unique Id
+        System.arraycopy(uniqueId,0,data,pos,uniqueId.length);
+        pos+=uniqueId.length;
+        //payload
+        XByteBuffer.toBytes(payload.length,data,pos);
+        pos+=4;
+        System.arraycopy(payload,0,data,pos,payload.length);
+        pos+=payload.length;
+        
+        //TRIBES_MBR_END
+        System.arraycopy(TRIBES_MBR_END,0,data,pos,TRIBES_MBR_END.length);
+        pos += TRIBES_MBR_END.length;
+
+        //create local data
+        dataPkg = data;
+        return data;
+    }
+    /**
+     * Deserializes a member from data sent over the wire
+     * @param data - the bytes received
+     * @return a member object.
+     */
+    public static MemberImpl getMember(byte[] data, MemberImpl member) {
+        return getMember(data,0,data.length,member);
+    }
+
+    public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) {
+        //package looks like
+        //start package TRIBES_MBR_BEGIN.length
+        //package length - 4 bytes
+        //alive - 8 bytes
+        //port - 4 bytes
+        //secure port - 4 bytes
+        //host length - 1 byte
+        //host - hl bytes
+        //clen - 4 bytes
+        //command - clen bytes
+        //dlen - 4 bytes
+        //domain - dlen bytes
+        //uniqueId - 16 bytes
+        //payload length - 4 bytes
+        //payload plen bytes
+        //end package TRIBES_MBR_END.length
+
+        int pos = offset;
+        
+        if (XByteBuffer.firstIndexOf(data,offset,TRIBES_MBR_BEGIN)!=pos) {
+            throw new IllegalArgumentException("Invalid package, should start with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_BEGIN));
+        }
+
+        if ( length < (TRIBES_MBR_BEGIN.length+4) ) {
+            throw new ArrayIndexOutOfBoundsException("Member package to small to validate.");
+        }
+        
+        pos += TRIBES_MBR_BEGIN.length;
+        
+        int bodylength = XByteBuffer.toInt(data,pos);
+        pos += 4;
+        
+        if ( length < (bodylength+4+TRIBES_MBR_BEGIN.length+TRIBES_MBR_END.length) ) {
+            throw new ArrayIndexOutOfBoundsException("Not enough bytes in member package.");
+        }
+        
+        int endpos = pos+bodylength;
+        if (XByteBuffer.firstIndexOf(data,endpos,TRIBES_MBR_END)!=endpos) {
+            throw new IllegalArgumentException("Invalid package, should end with:"+org.apache.catalina.tribes.util.Arrays.toString(TRIBES_MBR_END));
+        }
+
+
+        byte[] alived = new byte[8];
+        System.arraycopy(data, pos, alived, 0, 8);
+        pos += 8;
+        byte[] portd = new byte[4];
+        System.arraycopy(data, pos, portd, 0, 4);
+        pos += 4;
+        
+        byte[] sportd = new byte[4];
+        System.arraycopy(data, pos, sportd, 0, 4);
+        pos += 4;
+
+
+    
+        byte hl = data[pos++];
+        byte[] addr = new byte[hl];
+        System.arraycopy(data, pos, addr, 0, hl);
+        pos += hl;
+    
+        int cl = XByteBuffer.toInt(data, pos);
+        pos += 4;
+    
+        byte[] command = new byte[cl];
+        System.arraycopy(data, pos, command, 0, command.length);
+        pos += command.length;
+    
+        int dl = XByteBuffer.toInt(data, pos);
+        pos += 4;
+    
+        byte[] domain = new byte[dl];
+        System.arraycopy(data, pos, domain, 0, domain.length);
+        pos += domain.length;
+    
+        byte[] uniqueId = new byte[16];
+        System.arraycopy(data, pos, uniqueId, 0, 16);
+        pos += 16;
+    
+        int pl = XByteBuffer.toInt(data, pos);
+        pos += 4;
+    
+        byte[] payload = new byte[pl];
+        System.arraycopy(data, pos, payload, 0, payload.length);
+        pos += payload.length;
+    
+        member.setHost(addr);
+        member.setPort(XByteBuffer.toInt(portd, 0));
+        member.setSecurePort(XByteBuffer.toInt(sportd, 0));
+        member.setMemberAliveTime(XByteBuffer.toLong(alived, 0));
+        member.setUniqueId(uniqueId);
+        member.payload = payload;
+        member.domain = domain;
+        member.command = command;
+    
+        member.dataPkg = new byte[length];
+        System.arraycopy(data, offset, member.dataPkg, 0, length);
+    
+        return member;
+    }
+
+    public static MemberImpl getMember(byte[] data) {
+       return getMember(data,new MemberImpl());
+    }
+
+    /**
+     * Return the name of this object
+     * @return a unique name to the cluster
+     */
+    public String getName() {
+        return "tcp://"+getHostname()+":"+getPort();
+    }
+    
+    /**
+     * Return the listen port of this member
+     * @return - tcp listen port
+     */
+    public int getPort()  {
+        return this.port;
+    }
+
+    /**
+     * Return the TCP listen host for this member
+     * @return IP address or host name
+     */
+    public byte[] getHost()  {
+        return host;
+    }
+    
+    public String getHostname() {
+        if ( this.hostname != null ) return hostname;
+        else {
+            try {
+                this.hostname = java.net.InetAddress.getByAddress(host).getHostName();
+                return this.hostname;
+            }catch ( IOException x ) {
+                throw new RuntimeException("Unable to parse hostname.",x);
+            }
+        }
+    }
+
+    /**
+     * Contains information on how long this member has been online.
+     * The result is the number of milli seconds this member has been
+     * broadcasting its membership to the cluster.
+     * @return nr of milliseconds since this member started.
+     */
+    public long getMemberAliveTime() {
+       return memberAliveTime;
+    }
+
+    public long getServiceStartTime() {
+        return serviceStartTime;
+    }
+
+    public byte[] getUniqueId() {
+        return uniqueId;
+    }
+
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    public byte[] getCommand() {
+        return command;
+    }
+
+    public byte[] getDomain() {
+        return domain;
+    }
+
+    public int getSecurePort() {
+        return securePort;
+    }
+
+    public void setMemberAliveTime(long time) {
+       memberAliveTime=time;
+    }
+
+
+
+    /**
+     * String representation of this object
+     */
+    public String toString()  {
+        StringBuffer buf = new StringBuffer("org.apache.catalina.tribes.membership.MemberImpl[");
+        buf.append(getName()).append(",");
+        buf.append(getHostname()).append(",");
+        buf.append(port).append(", alive=");
+        buf.append(memberAliveTime).append(",");
+        buf.append("id=").append(bToS(this.uniqueId)).append(", ");
+        buf.append("payload=").append(bToS(this.payload,8)).append(", ");
+        buf.append("command=").append(bToS(this.command,8)).append(", ");
+        buf.append("domain=").append(bToS(this.domain,8)).append(", ");
+        buf.append("]");
+        return buf.toString();
+    }
+    public static String bToS(byte[] data) {
+        return bToS(data,data.length);
+    }
+    public static String bToS(byte[] data, int max) {
+        StringBuffer buf = new StringBuffer(4*16);
+        buf.append("{");
+        for (int i=0; data!=null && i<data.length; i++ ) {
+            buf.append(String.valueOf(data[i])).append(" ");
+            if ( i==max ) {
+                buf.append("...("+data.length+")");
+                break;
+            }
+        }
+        buf.append("}");
+        return buf.toString();
+    }
+
+    /**
+     * @see java.lang.Object#hashCode()
+     * @return The hash code
+     */
+    public int hashCode() {
+        return getHost()[0]+getHost()[1]+getHost()[2]+getHost()[3];
+    }
+
+    /**
+     * Returns true if the param o is a McastMember with the same name
+     * @param o
+     */
+    public boolean equals(Object o) {
+        if ( o instanceof MemberImpl )    {
+            return Arrays.equals(this.getHost(),((MemberImpl)o).getHost()) &&
+                   this.getPort() == ((MemberImpl)o).getPort() &&
+                   Arrays.equals(this.getUniqueId(),((MemberImpl)o).getUniqueId());
+        }
+        else
+            return false;
+    }
+    
+    public void setHost(byte[] host) {
+        this.host = host;
+    }
+    
+    public void setHostname(String host) throws IOException {
+        hostname = host;
+        this.host = java.net.InetAddress.getByName(host).getAddress();
+    }
+    
+    public void setMsgCount(int msgCount) {
+        this.msgCount = msgCount;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+        this.dataPkg = null;
+    }
+
+    public void setServiceStartTime(long serviceStartTime) {
+        this.serviceStartTime = serviceStartTime;
+    }
+
+    public void setUniqueId(byte[] uniqueId) {
+        this.uniqueId = uniqueId!=null?uniqueId:new byte[16];
+        getData(true,true);
+    }
+
+    public void setPayload(byte[] payload) {
+        byte[] oldpayload = this.payload;
+        this.payload = payload!=null?payload:new byte[0];
+        if ( this.getData(true,true).length > McastServiceImpl.MAX_PACKET_SIZE ) {
+            this.payload = oldpayload;
+            throw new IllegalArgumentException("Payload is to large for tribes to handle.");
+        }
+        
+    }
+
+    public void setCommand(byte[] command) {
+        this.command = command!=null?command:new byte[0];
+        getData(true,true);
+    }
+
+    public void setDomain(byte[] domain) {
+        this.domain = domain!=null?domain:new byte[0];
+        getData(true,true);
+    }
+
+    public void setSecurePort(int securePort) {
+        this.securePort = securePort;
+    }
+
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        int length = in.readInt();
+        byte[] message = new byte[length];
+        in.read(message);
+        getMember(message,this);
+        
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        byte[] data = this.getData();
+        out.writeInt(data.length);
+        out.write(data);
+    }
+    
+}

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?view=diff&rev=441543&r1=441542&r2=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Fri Sep  8 08:42:08 2006
@@ -49,6 +49,7 @@
     private String host = "auto";
     private InetAddress bind;
     private int port  = 4000;
+    private int securePort = -1;
     private int rxBufSize = 43800;
     private int txBufSize = 25188;
     private boolean listen = false;
@@ -299,6 +300,10 @@
         return useBufferPool;
     }
 
+    public int getSecurePort() {
+        return securePort;
+    }
+
     public void setTcpSelectorTimeout(long selTimeout) {
         tcpSelectorTimeout = selTimeout;
     }
@@ -375,6 +380,10 @@
 
     public void setUseBufferPool(boolean useBufferPool) {
         this.useBufferPool = useBufferPool;
+    }
+
+    public void setSecurePort(int securePort) {
+        this.securePort = securePort;
     }
 
     public void heartbeat() {

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TestNioSender.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TestNioSender.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TestNioSender.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TestNioSender.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,106 @@
+package org.apache.catalina.tribes.test;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+import java.nio.channels.Selector;
+import org.apache.catalina.tribes.transport.nio.NioSender;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.Channel;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestNioSender {
+    private Selector selector = null;
+    private int counter = 0;
+    MemberImpl mbr;
+    private static int testOptions = Channel.SEND_OPTIONS_DEFAULT;
+    public TestNioSender()  {
+        
+    }
+    
+    public synchronized int inc() {
+        return ++counter;
+    }
+    
+    public synchronized ChannelData getMessage(Member mbr) {
+        String msg = new String("Thread-"+Thread.currentThread().getName()+" Message:"+inc());
+        ChannelData data = new ChannelData(true);
+        data.setMessage(new XByteBuffer(msg.getBytes(),false));
+        data.setAddress(mbr);
+        
+        return data;
+    }
+
+    public void init() throws Exception {
+        selector = Selector.open();
+        mbr = new MemberImpl("localhost",4444,0);
+        NioSender sender = new NioSender();
+        sender.setDestination(mbr);
+        sender.setDirectBuffer(true);
+        sender.setSelector(selector);
+        sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
+        sender.connect();
+    }
+
+    public void run() {
+        while (true) {
+
+            int selectedKeys = 0;
+            try {
+                selectedKeys = selector.select(100);
+                //               if ( selectedKeys == 0 ) {
+                //                   System.out.println("No registered interests. Sleeping for a second.");
+                //                   Thread.sleep(100);
+            } catch (Exception e) {
+                e.printStackTrace();
+                continue;
+            }
+
+            if (selectedKeys == 0) {
+                continue;
+            }
+
+            Iterator it = selector.selectedKeys().iterator();
+            while (it.hasNext()) {
+                SelectionKey sk = (SelectionKey) it.next();
+                it.remove();
+                try {
+                    int readyOps = sk.readyOps();
+                    sk.interestOps(sk.interestOps() & ~readyOps);
+                    NioSender sender = (NioSender) sk.attachment();
+                    if ( sender.process(sk, (testOptions&Channel.SEND_OPTIONS_USE_ACK)==Channel.SEND_OPTIONS_USE_ACK) ) {
+                        System.out.println("Message completed for handler:"+sender);
+                        Thread.currentThread().sleep(2000);
+                        sender.reset();
+                        sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
+                    }
+                    
+
+                } catch (Throwable t) {
+                    t.printStackTrace();
+                    return;
+                }
+            }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        TestNioSender sender = new TestNioSender();
+        sender.init();
+        sender.run();
+    }
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TribesTestSuite.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TribesTestSuite.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TribesTestSuite.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/TribesTestSuite.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,24 @@
+package org.apache.catalina.tribes.test;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TribesTestSuite
+    extends TestCase {
+
+    public TribesTestSuite(String s) {
+        super(s);
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite();
+        suite.addTestSuite(org.apache.catalina.tribes.test.channel.ChannelStartStop.class);
+        suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestChannelOptionFlag.class);
+        suite.addTestSuite(org.apache.catalina.tribes.test.membership.MemberSerialization.class);
+        suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestMemberArrival.class);
+        suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestTcpFailureDetector.class);
+        suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestDataIntegrity.class);
+        return suite;
+    }
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/ChannelStartStop.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,118 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.test.channel;
+
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class ChannelStartStop extends TestCase {
+    GroupChannel channel = null;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channel = new GroupChannel();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        try {channel.stop(channel.DEFAULT);}catch (Exception ignore){}
+    }
+    
+    public void testDoubleFullStart() throws Exception {
+        int count = 0;
+        try {
+            channel.start(channel.DEFAULT);
+            count++;
+        } catch ( Exception x){x.printStackTrace();}
+        try {
+            channel.start(channel.DEFAULT);
+            count++;
+        } catch ( Exception x){x.printStackTrace();}
+        assertEquals(count,2);
+        channel.stop(channel.DEFAULT);
+    }
+
+    public void testDoublePartialStart() throws Exception {
+        //try to double start the RX 
+        int count = 0;
+        try {
+            channel.start(channel.SND_RX_SEQ);
+            channel.start(channel.MBR_RX_SEQ);
+            count++;
+        } catch ( Exception x){x.printStackTrace();}
+        try {
+            channel.start(channel.MBR_RX_SEQ);
+            count++;
+        } catch ( Exception x){/*expected*/}
+        assertEquals(count,1);
+        channel.stop(channel.DEFAULT);
+        //double the membership sender
+        count = 0;
+        try {
+            channel.start(channel.SND_RX_SEQ);
+            channel.start(channel.MBR_TX_SEQ);
+            count++;
+        } catch ( Exception x){x.printStackTrace();}
+        try {
+            channel.start(channel.MBR_TX_SEQ);
+            count++;
+        } catch ( Exception x){/*expected*/}
+        assertEquals(count,1);
+        channel.stop(channel.DEFAULT);
+        
+        count = 0;
+        try {
+            channel.start(channel.SND_RX_SEQ);
+            count++;
+        } catch ( Exception x){x.printStackTrace();}
+        try {
+            channel.start(channel.SND_RX_SEQ);
+            count++;
+        } catch ( Exception x){/*expected*/}
+        assertEquals(count,1);
+        channel.stop(channel.DEFAULT);
+
+        count = 0;
+        try {
+            channel.start(channel.SND_TX_SEQ);
+            count++;
+        } catch ( Exception x){x.printStackTrace();}
+        try {
+            channel.start(channel.SND_TX_SEQ);
+            count++;
+        } catch ( Exception x){/*expected*/}
+        assertEquals(count,1);
+        channel.stop(channel.DEFAULT);
+    }
+    
+    public void testFalseOption() throws Exception {
+        int flag = 0xFFF0;//should get ignored by the underlying components
+        int count = 0;
+        try {
+            channel.start(flag);
+            count++;
+        } catch ( Exception x){x.printStackTrace();}
+        try {
+            channel.start(flag);
+            count++;
+        } catch ( Exception x){/*expected*/}
+        assertEquals(count,2);
+        channel.stop(channel.DEFAULT);
+    }
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestChannelOptionFlag.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,76 @@
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.*;
+import org.apache.catalina.tribes.group.*;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelException;
+
+/**
+ * <p>Title: </p> 
+ * 
+ * <p>Description: </p> 
+ * 
+ * <p>Copyright: Copyright (c) 2005</p> 
+ * 
+ * <p>Company: </p>
+ * 
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestChannelOptionFlag extends TestCase {
+    GroupChannel channel = null;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channel = new GroupChannel();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if ( channel != null ) try {channel.stop(channel.DEFAULT);}catch ( Exception ignore) {}
+        channel = null;
+    }
+    
+    
+    public void testOptionConflict() throws Exception {
+        boolean error = false;
+        channel.setOptionCheck(true);
+        ChannelInterceptor i = new TestInterceptor();
+        i.setOptionFlag(128);
+        channel.addInterceptor(i);
+        i = new TestInterceptor();
+        i.setOptionFlag(128);
+        channel.addInterceptor(i);
+        try {
+            channel.start(channel.DEFAULT);
+        }catch ( ChannelException x ) {
+            if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
+        }
+        assertEquals(true,error);
+    }
+
+    public void testOptionNoConflict() throws Exception {
+        boolean error = false;
+        channel.setOptionCheck(true);
+        ChannelInterceptor i = new TestInterceptor();
+        i.setOptionFlag(128);
+        channel.addInterceptor(i);
+        i = new TestInterceptor();
+        i.setOptionFlag(64);
+        channel.addInterceptor(i);
+        i = new TestInterceptor();
+        i.setOptionFlag(256);
+        channel.addInterceptor(i);
+        try {
+            channel.start(channel.DEFAULT);
+        }catch ( ChannelException x ) {
+            if ( x.getMessage().indexOf("option flag conflict") >= 0 ) error = true;
+        }
+        assertEquals(false,error);
+    }
+    
+    public static class TestInterceptor extends ChannelInterceptorBase {
+        
+    }
+
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,173 @@
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.test.channel.TestDataIntegrity.Listener;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
+
+/**
+ * <p>Title: </p> 
+ * 
+ * <p>Description: </p> 
+ * 
+ * <p>Copyright: Copyright (c) 2005</p> 
+ * 
+ * <p>Company: </p>
+ * 
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestDataIntegrity extends TestCase {
+    int msgCount = 500;
+    int threadCount = 20;
+    GroupChannel channel1;
+    GroupChannel channel2;
+    Listener listener1;
+    int threadCounter = 0;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channel1 = new GroupChannel();
+        channel1.addInterceptor(new MessageDispatch15Interceptor());
+        channel2 = new GroupChannel();
+        channel2.addInterceptor(new MessageDispatch15Interceptor());
+        listener1 = new Listener();
+        channel2.addChannelListener(listener1);
+        channel1.start(GroupChannel.DEFAULT);
+        channel2.start(GroupChannel.DEFAULT);
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        channel1.stop(GroupChannel.DEFAULT);
+        channel2.stop(GroupChannel.DEFAULT);
+    }
+    
+    public void testDataSendNO_ACK() throws Exception {
+        System.err.println("Starting NO_ACK");
+        Thread[] threads = new Thread[threadCount];
+        for (int x=0; x<threads.length; x++ ) {
+            threads[x] = new Thread() {
+                public void run() {
+                    try {
+                        for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),0);
+                    }catch ( Exception x ) {
+                        x.printStackTrace();
+                        return;
+                    } finally {
+                        threadCounter++;
+                    }
+                }
+            };
+        }
+        for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+        for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+        //sleep for 50 sec, let the other messages in
+        long start = System.currentTimeMillis();
+        while ( (System.currentTimeMillis()-start)<30000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);
+        System.err.println("Finished NO_ACK");
+        assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);
+    }
+    
+    public void testDataSendASYNCM() throws Exception {
+            System.err.println("Starting ASYNC MULTI THREAD");
+            Thread[] threads = new Thread[threadCount];
+            for (int x=0; x<threads.length; x++ ) {
+                threads[x] = new Thread() {
+                    public void run() {
+                        try {
+                            for (int i = 0; i < msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)}, Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+                        }catch ( Exception x ) {
+                            x.printStackTrace();
+                            return;
+                        } finally {
+                            threadCounter++;
+                        }
+                    }
+                };
+            }
+            for (int x=0; x<threads.length; x++ ) { threads[x].start();}
+            for (int x=0; x<threads.length; x++ ) { threads[x].join();}
+            //sleep for 50 sec, let the other messages in
+            long start = System.currentTimeMillis();
+            while ( (System.currentTimeMillis()-start)<15000 && msgCount*threadCount!=listener1.count) Thread.sleep(500);
+            System.err.println("Finished ASYNC MULTI THREAD");
+            assertEquals("Checking success messages.",msgCount*threadCount,listener1.count);
+    }
+    public void testDataSendASYNC() throws Exception {
+        System.err.println("Starting ASYNC");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_ASYNCHRONOUS);
+        //sleep for 50 sec, let the other messages in
+        long start = System.currentTimeMillis();
+        while ( (System.currentTimeMillis()-start)<5000 && msgCount!=listener1.count) Thread.sleep(500);
+        System.err.println("Finished ASYNC");
+        assertEquals("Checking success messages.",msgCount,listener1.count);
+    }
+
+    public void testDataSendACK() throws Exception {
+        System.err.println("Starting ACK");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_USE_ACK);
+        Thread.sleep(250);
+        System.err.println("Finished ACK");
+        assertEquals("Checking success messages.",msgCount,listener1.count);
+    }
+
+    public void testDataSendSYNCACK() throws Exception {
+        System.err.println("Starting SYNC_ACK");
+        for (int i=0; i<msgCount; i++) channel1.send(new Member[] {channel2.getLocalMember(false)},Data.createRandomData(),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
+        Thread.sleep(250);
+        System.err.println("Finished SYNC_ACK");
+        assertEquals("Checking success messages.",msgCount,listener1.count);
+    }
+
+    public static class Listener implements ChannelListener {
+        long count = 0;
+        public boolean accept(Serializable s, Member m) {
+            return (s instanceof Data);
+        }
+        
+        public void messageReceived(Serializable s, Member m) {
+            Data d = (Data)s;
+            if ( !Data.verify(d) ) {
+                System.err.println("ERROR");
+            } else {
+                count++;
+                if ((count %1000) ==0 ) {
+                    System.err.println("SUCCESS:"+count);
+                }
+            }
+        }
+    }
+    
+    public static class Data implements Serializable {
+        public int length;
+        public byte[] data;
+        public byte key;
+        public static Random r = new Random(System.currentTimeMillis());
+        public static Data createRandomData() {
+            int i = r.nextInt();
+            i = ( i % 127 );
+            int length = Math.abs(r.nextInt() % 65555);
+            Data d = new Data();
+            d.length = length;
+            d.key = (byte)i;
+            d.data = new byte[length];
+            Arrays.fill(d.data,d.key);
+            return d;
+        }
+        
+        public static boolean verify(Data d) {
+            boolean result = (d.length == d.data.length);
+            for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;
+            return result;
+        }
+    }
+    
+    
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,123 @@
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import java.io.PrintStream;
+
+/**
+ * <p>Title: </p> 
+ * 
+ * <p>Description: </p> 
+ * 
+ * <p>Copyright: Copyright (c) 2005</p> 
+ * 
+ * <p>Company: </p>
+ * 
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestRemoteProcessException extends TestCase {
+    int msgCount = 10000;
+    GroupChannel channel1;
+    GroupChannel channel2;
+    Listener listener1;
+    protected void setUp() throws Exception {
+        super.setUp();
+        channel1 = new GroupChannel();
+        channel2 = new GroupChannel();
+        listener1 = new Listener();
+        channel2.addChannelListener(listener1);
+        channel1.start(GroupChannel.DEFAULT);
+        channel2.start(GroupChannel.DEFAULT);
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        channel1.stop(GroupChannel.DEFAULT);
+        channel2.stop(GroupChannel.DEFAULT);
+    }
+
+    public void testDataSendSYNCACK() throws Exception {
+        System.err.println("Starting SYNC_ACK");
+        int errC=0, nerrC=0;
+        for (int i=0; i<msgCount; i++) {
+            boolean error = Data.r.nextBoolean();
+            channel1.send(channel1.getMembers(),Data.createRandomData(error),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
+            if ( error ) errC++; else nerrC++;
+        }
+        System.err.println("Finished SYNC_ACK");
+        assertEquals("Checking failure messages.",errC,listener1.errCnt);
+        assertEquals("Checking success messages.",nerrC,listener1.noErrCnt);
+        assertEquals("Checking all messages.",msgCount,listener1.noErrCnt+listener1.errCnt);
+        System.out.println("Listener 1 stats:");
+        listener1.printStats(System.out);
+    }
+
+    public static class Listener implements ChannelListener {
+        long noErrCnt = 0;
+        long errCnt = 0;
+        public boolean accept(Serializable s, Member m) {
+            return (s instanceof Data);
+        }
+
+        public void messageReceived(Serializable s, Member m) {
+            Data d = (Data)s;
+            if ( !Data.verify(d) ) {
+                System.err.println("ERROR");
+            } else {
+                if (d.error) {
+                    errCnt++;
+                    if ( (errCnt % 100) == 0) {
+                        printStats(System.err);
+                    }
+                    throw new IllegalArgumentException();
+                } else {
+                    noErrCnt++;
+                    if ( (noErrCnt % 100) == 0) {
+                        printStats(System.err);
+                    }
+                }
+            }
+        }
+
+        public void printStats(PrintStream stream) {
+            stream.println("NORMAL:" + noErrCnt);
+            stream.println("FAILURES:" + errCnt);
+            stream.println("TOTAL:" + (errCnt+noErrCnt));
+        }
+    }
+
+    public static class Data implements Serializable {
+        public int length;
+        public byte[] data;
+        public byte key;
+        public boolean error = false;
+        public static Random r = new Random(System.currentTimeMillis());
+        public static Data createRandomData(boolean error) {
+            int i = r.nextInt();
+            i = ( i % 127 );
+            int length = Math.abs(r.nextInt() % 65555);
+            Data d = new Data();
+            d.length = length;
+            d.key = (byte)i;
+            d.data = new byte[length];
+            Arrays.fill(d.data,d.key);
+            d.error = error;
+            return d;
+        }
+
+        public static boolean verify(Data d) {
+            boolean result = (d.length == d.data.length);
+            for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;
+            return result;
+        }
+    }
+
+
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestNonBlockingCoordinator.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,95 @@
+package org.apache.catalina.tribes.test.interceptors;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import junit.framework.TestCase;
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+
+public class TestNonBlockingCoordinator extends TestCase {
+
+    GroupChannel[] channels = null;
+    NonBlockingCoordinator[] coordinators = null;
+    int channelCount = 10;
+    Thread[] threads = null;
+    protected void setUp() throws Exception {
+        System.out.println("Setup");
+        super.setUp();
+        channels = new GroupChannel[channelCount];
+        coordinators = new NonBlockingCoordinator[channelCount];
+        threads = new Thread[channelCount];
+        for ( int i=0; i<channelCount; i++ ) {
+            channels[i] = new GroupChannel();
+            coordinators[i] = new NonBlockingCoordinator();
+            channels[i].addInterceptor(coordinators[i]);
+            channels[i].addInterceptor(new TcpFailureDetector());
+            final int j = i;
+            threads[i] = new Thread() {
+                public void run() {
+                    try {
+                        channels[j].start(Channel.DEFAULT);
+                        Thread.sleep(50);
+                    } catch (Exception x) {
+                        x.printStackTrace();
+                    }
+                }
+            };
+        }
+        for ( int i=0; i<channelCount; i++ ) threads[i].start();
+        for ( int i=0; i<channelCount; i++ ) threads[i].join();
+        Thread.sleep(1000);
+    }
+    
+    public void testCoord1() throws Exception {
+        for (int i=1; i<channelCount; i++ ) 
+            assertEquals("Message count expected to be equal.",channels[i-1].getMembers().length,channels[i].getMembers().length);
+        Member member = coordinators[0].getCoordinator();
+        int cnt = 0;
+        while ( member == null && (cnt++ < 100 ) ) try {Thread.sleep(100); member = coordinators[0].getCoordinator();}catch ( Exception x){}
+        for (int i=0; i<channelCount; i++ ) super.assertEquals(member,coordinators[i].getCoordinator());
+        System.out.println("Coordinator[1] is:"+member);
+        
+    }
+    
+    public void testCoord2() throws Exception {
+        Member member = coordinators[1].getCoordinator();
+        System.out.println("Coordinator[2a] is:" + member);
+        int index = -1;
+        for ( int i=0; i<channelCount; i++ ) {
+            if ( channels[i].getLocalMember(false).equals(member) ) {
+                System.out.println("Shutting down:" + channels[i].getLocalMember(true).toString());
+                channels[i].stop(Channel.DEFAULT);
+                index = i;
+            }
+        }
+        int dead = index;
+        Thread.sleep(1000);
+        if ( index == 0 ) index = 1; else index = 0;
+        System.out.println("Member count:"+channels[index].getMembers().length);
+        member = coordinators[index].getCoordinator();
+        for (int i = 1; i < channelCount; i++) if ( i != dead ) super.assertEquals(member, coordinators[i].getCoordinator());
+        System.out.println("Coordinator[2b] is:" + member);
+    }
+
+    protected void tearDown() throws Exception {
+        System.out.println("tearDown");
+        super.tearDown();
+        for ( int i=0; i<channelCount; i++ ) {
+            channels[i].stop(Channel.DEFAULT);
+        }
+    }
+    
+    public static void main(String[] args) throws Exception {
+        TestSuite suite = new TestSuite();
+        suite.addTestSuite(TestNonBlockingCoordinator.class);
+        suite.run(new TestResult());
+    }
+    
+    
+    
+    
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestTwoPhaseCommit.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.catalina.tribes.test.interceptors;
+
+import junit.framework.TestCase;
+
+/**
+ * <p>Title: </p> 
+ * 
+ * <p>Description: </p> 
+ * 
+ * <p>Copyright: Copyright (c) 2005</p> 
+ * 
+ * <p>Company: </p>
+ * 
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestTwoPhaseCommit extends TestCase {
+
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSenderConnections.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,112 @@
+package org.apache.catalina.tribes.test.io;
+
+import java.util.ArrayList;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.GroupChannel;
+import junit.framework.TestCase;
+import org.apache.catalina.tribes.ChannelListener;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.HashMap;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+
+public class TestSenderConnections extends TestCase {
+    private static int count = 2;
+    private ManagedChannel[] channels = new ManagedChannel[count];
+    private TestMsgListener[] listeners = new TestMsgListener[count];
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        for (int i = 0; i < channels.length; i++) {
+            channels[i] = new GroupChannel();
+            channels[i].getMembershipService().setPayload( ("Channel-" + (i + 1)).getBytes("ASCII"));
+            listeners[i] = new TestMsgListener( ("Listener-" + (i + 1)));
+            channels[i].addChannelListener(listeners[i]);
+            channels[i].start(Channel.SND_RX_SEQ|Channel.SND_TX_SEQ);
+
+        }
+    }
+
+    public void clear() {
+    }
+
+    public void sendMessages(long delay, long sleep) throws Exception {
+        Member local = channels[0].getLocalMember(true);
+        Member dest = channels[1].getLocalMember(true);
+        int n = 3;
+        System.out.println("Sending " + n + " messages from [" + local.getName() + "] to [" + dest.getName() + "]");
+        for (int i = 0; i < n; i++) {
+            channels[0].send(new Member[] {dest}, new TestMsg(), 0);
+            if ( delay > 0 ) Thread.sleep(delay);
+        }
+        System.out.println("Messages sent. Sleeping for "+(sleep/1000)+" seconds to inspect connections");
+        if ( sleep > 0 ) Thread.sleep(sleep);
+
+    }
+
+    public void testConnectionLinger() throws Exception {
+        sendMessages(0,15000);
+    }
+    
+    public void testKeepAliveCount() throws Exception {
+        System.out.println("Setting keep alive count to 0");
+        for (int i = 0; i < channels.length; i++) {
+            ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
+            t.getTransport().setKeepAliveCount(0);
+        }
+        sendMessages(1000,15000);
+    }
+
+    public void testKeepAliveTime() throws Exception {
+        System.out.println("Setting keep alive count to 1 second");
+        for (int i = 0; i < channels.length; i++) {
+            ReplicationTransmitter t = (ReplicationTransmitter)channels[0].getChannelSender();
+            t.getTransport().setKeepAliveTime(1000);
+        }
+        sendMessages(2000,15000);
+    }
+
+    protected void tearDown() throws Exception {
+        for (int i = 0; i < channels.length; i++) {
+            channels[i].stop(Channel.DEFAULT);
+        }
+
+    }
+    
+    public static class TestMsg implements Serializable {
+        static Random r = new Random(System.currentTimeMillis());
+        HashMap map = new HashMap();
+        public TestMsg() {
+            int size = Math.abs(r.nextInt() % 200);
+            for (int i=0; i<size; i++ ) {
+                int length = Math.abs(r.nextInt() %65000);
+                ArrayList list = new ArrayList(length);
+                map.put(new Integer(i),list);
+            }
+        }
+    }
+
+    public class TestMsgListener implements ChannelListener {
+        public String name = null;
+        public TestMsgListener(String name) {
+            this.name = name;
+        }
+        
+        public void messageReceived(Serializable msg, Member sender) {
+            System.out.println("["+name+"] Received message:"+msg+" from " + sender.getName());
+        }
+
+    
+        public boolean accept(Serializable msg, Member sender) {
+            return true;
+        }
+
+
+        
+    }
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSerialization.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSerialization.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSerialization.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/io/TestSerialization.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,24 @@
+package org.apache.catalina.tribes.test.io;
+
+import org.apache.catalina.tribes.io.XByteBuffer;
+import junit.framework.TestCase;
+
+public class TestSerialization extends TestCase {
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+    
+    public void testEmptyArray() throws Exception {
+        
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+    
+    public static void main(String[] args) throws Exception {
+        //XByteBuffer.deserialize(new byte[0]);
+        XByteBuffer.deserialize(new byte[] {-84, -19, 0, 5, 115, 114, 0, 17, 106});
+    }
+
+}

Added: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/MemberSerialization.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/MemberSerialization.java?view=auto&rev=441543
==============================================================================
--- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/MemberSerialization.java (added)
+++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/membership/MemberSerialization.java Fri Sep  8 08:42:08 2006
@@ -0,0 +1,100 @@
+/*
+ * Copyright 1999,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+package org.apache.catalina.tribes.test.membership;
+
+import junit.framework.TestCase;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import java.util.Arrays;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class MemberSerialization extends TestCase {
+    MemberImpl m1, m2, p1,p2;
+    byte[] payload = null;
+    protected void setUp() throws Exception {
+        super.setUp();
+        payload = new byte[333];
+        Arrays.fill(payload,(byte)1);
+        m1 = new MemberImpl("localhost",3333,1,payload);
+        m2 = new MemberImpl("localhost",3333,1);
+        payload = new byte[333];
+        Arrays.fill(payload,(byte)2);
+        p1 = new MemberImpl("127.0.0.1",3333,1,payload);
+        p2 = new MemberImpl("localhost",3331,1,payload);
+        m1.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+        m2.setDomain(new byte[] {1,2,3,4,5,6,7,8,9});
+        m1.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+        m2.setCommand(new byte[] {1,2,4,5,6,7,8,9});
+    }
+    
+    public void testCompare() throws Exception {
+        assertTrue(m1.equals(m2));
+        assertTrue(m2.equals(m1));
+        assertTrue(p1.equals(m2));
+        assertFalse(m1.equals(p2));
+        assertFalse(m1.equals(p2));
+        assertFalse(m2.equals(p2));
+        assertFalse(p1.equals(p2));
+    }
+    
+    public void testSerializationOne() throws Exception {
+        MemberImpl m = m1;
+        byte[] md1 = m.getData(false,true);
+        byte[] mda1 = m.getData(false,false);
+        assertTrue(Arrays.equals(md1,mda1));
+        assertTrue(md1==mda1);
+        mda1 = m.getData(true,true);
+        MemberImpl ma1 = MemberImpl.getMember(mda1);
+        assertTrue(compareMembers(m,ma1));
+        mda1 = p1.getData(false);
+        assertFalse(Arrays.equals(md1,mda1));
+        ma1 = MemberImpl.getMember(mda1);
+        assertTrue(compareMembers(p1,ma1));
+        
+        md1 = m.getData(true,true);
+        Thread.sleep(50);
+        mda1 = m.getData(true,true);
+        MemberImpl a1 = MemberImpl.getMember(md1);
+        MemberImpl a2 = MemberImpl.getMember(mda1);
+        assertTrue(a1.equals(a2));
+        assertFalse(Arrays.equals(md1,mda1));
+        
+        
+    }
+    
+    public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) {
+        boolean result = true;
+        result = result && Arrays.equals(impl1.getHost(),impl2.getHost());
+        result = result && Arrays.equals(impl1.getPayload(),impl2.getPayload());
+        result = result && Arrays.equals(impl1.getUniqueId(),impl2.getUniqueId());
+        result = result && impl1.getPort() == impl2.getPort();
+        return result;
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org