You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/02 23:58:05 UTC

svn commit: r382542 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: mcast/ tcp/ tcp/bio/ tcp/nio/

Author: fhanik
Date: Thu Mar  2 14:58:03 2006
New Revision: 382542

URL: http://svn.apache.org/viewcvs?rev=382542&view=rev
Log:
Optimized serialization even further.
Implented an abstract sender pool class

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Thu Mar  2 14:58:03 2006
@@ -81,6 +81,8 @@
      * For the local member only
      */
     protected transient long serviceStartTime;
+    
+    protected transient byte[] dataPkg = null;
 
     /**
      * Empty constructor for serialization
@@ -136,6 +138,14 @@
      * @throws Exception
      */
     public byte[] getData()  {
+        long alive=System.currentTimeMillis()-getServiceStartTime();
+        
+        //look in cache first
+        if ( dataPkg!=null ) {
+            XByteBuffer.toBytes((long)alive,dataPkg,0);
+            return dataPkg;
+        }
+        
         //package looks like
         //alive - 8 bytes
         //port - 4 bytes
@@ -145,7 +155,7 @@
         byte[] domaind = this.domain;
         byte[] addr = host;
         byte[] data = new byte[8+4+addr.length+4+domaind.length];
-        long alive=System.currentTimeMillis()-getServiceStartTime();
+        
         
         //reduce byte copying
         //System.arraycopy(XByteBuffer.toBytes((long)alive),0,data,0,8);
@@ -162,6 +172,7 @@
         XByteBuffer.toBytes(domaind.length,data,16);
         
         System.arraycopy(domaind,0,data,20,domaind.length);
+        dataPkg = data;
         return data;
     }
     /**
@@ -369,9 +380,11 @@
 
     public void setDomain(String domain) {
         this.domain = domain.getBytes();
+        this.dataPkg = null;
     }
     public void setPort(int port) {
         this.port = port;
+        this.dataPkg = null;
     }
 
     public void setServiceStartTime(long serviceStartTime) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar  2 14:58:03 2006
@@ -35,5 +35,10 @@
     public boolean isConnected();
     public void setRxBufSize(int size);
     public void setTxBufSize(int size);
+    public boolean checkKeepAlive();
+    public void setTimeout(long timeout);
+    public void setWaitForAck(boolean isWaitForAck);
+
+
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties Thu Mar  2 14:58:03 2006
@@ -1,4 +1,4 @@
 fastasyncqueue=org.apache.catalina.tribes.tcp.bio.FastAsyncSocketSender
 synchronous=org.apache.catalina.tribes.tcp.bio.SocketSender
 pooled=org.apache.catalina.tribes.tcp.bio.PooledSocketSender
-parallel=org.apache.catalina.tribes.tcp.nio.PooledNioSender
\ No newline at end of file
+parallel=org.apache.catalina.tribes.tcp.nio.PooledParallelSender
\ No newline at end of file

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java Thu Mar  2 14:58:03 2006
@@ -31,9 +31,8 @@
 import org.apache.catalina.tribes.MessageListener;
 import org.apache.catalina.tribes.io.ListenCallback;
 import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.tcp.nio.TcpReplicationThread;
 import org.apache.catalina.util.StringManager;
-import org.apache.catalina.tribes.tcp.nio.*;
 
 /**
  * @author Filip Hanik

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Thu Mar  2 14:58:03 2006
@@ -154,7 +154,7 @@
     /**
      * @return The ack timeout
      */
-    public long getAckTimeout() {
+    public long getTimeout() {
         return ackTimeout;
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java Thu Mar  2 14:58:03 2006
@@ -33,17 +33,9 @@
     public void setPort(int port);
     public int getPort();
     public void sendMessage(ChannelMessage data) throws ChannelException;
-    public boolean isConnected();
     public void setSuspect(boolean suspect);
     public boolean getSuspect();
-    public void setAckTimeout(long timeout);
-    public long getAckTimeout();
-    public boolean getWaitForAck();
-    public void setWaitForAck(boolean isWaitForAck);
-    public boolean checkKeepAlive();
     public String getDomain() ;
     public void setDomain(String domain) ;
-    public void setRxBufSize(int size);
-    public void setTxBufSize(int size);
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java Thu Mar  2 14:58:03 2006
@@ -25,6 +25,8 @@
 import org.apache.catalina.tribes.tcp.PooledSender;
 import org.apache.catalina.tribes.tcp.DataSender;
 import org.apache.catalina.tribes.tcp.SenderState;
+import org.apache.catalina.tribes.tcp.SinglePointSender;
+import java.net.Inet4Address;
 
 /**
  * Send cluster messages with a pool of sockets (25).
@@ -34,7 +36,7 @@
  * @version 1.2
  */
 
-public class PooledSocketSender extends PooledSender {
+public class PooledSocketSender extends PooledSender implements SinglePointSender {
 
     private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
             .getLog(org.apache.catalina.tribes.tcp.bio.PooledSocketSender.class);
@@ -50,9 +52,9 @@
     private String domain;
     private InetAddress host;
     private int port;
-    private SenderState senderState;
-    private int keepAliveMaxRequestCount;
-    private long keepAliveTimeout;
+    private SenderState senderState = new SenderState(SenderState.READY);
+    private int keepAliveMaxRequestCount = -1;
+    private long keepAliveTimeout = 1000*60;
     private long ackTimeout;
     private boolean resend;
     private boolean waitForAck;
@@ -64,6 +66,10 @@
     * @param host replication node tcp address
     * @param port replication node tcp port
     */
+   public PooledSocketSender(String domain,InetAddress host, int port) {
+       this(domain,host,port,25);
+   }
+
     public PooledSocketSender(String domain,InetAddress host, int port, int poolSize) {
         super(poolSize);
         this.host = host;
@@ -109,7 +115,7 @@
         this.keepAliveTimeout = keepAliveTimeout;
     }
 
-    public void setAckTimeout(long ackTimeout) {
+    public void setTimeout(long ackTimeout) {
         this.ackTimeout = ackTimeout;
     }
 
@@ -145,7 +151,7 @@
         return keepAliveTimeout;
     }
 
-    public long getAckTimeout() {
+    public long getTimeout() {
         return ackTimeout;
     }
 
@@ -172,8 +178,7 @@
         //get a socket sender from the pool
         if(!isConnected()) {
             synchronized(this) {
-                if(!isConnected())
-                    connect();
+                if(!isConnected()) connect();
             }
         }
         SinglePointDataSender sender = (SinglePointDataSender)getSender();
@@ -204,7 +209,7 @@
                                                getSenderState() );
             sender.setKeepAliveMaxRequestCount(getKeepAliveMaxRequestCount());
             sender.setKeepAliveTimeout(getKeepAliveTimeout());
-            sender.setAckTimeout(getAckTimeout());
+            sender.setTimeout(getTimeout());
             sender.setWaitForAck(getWaitForAck());
             sender.setResend(isResend());
             sender.setRxBufSize(getRxBufSize());
@@ -212,4 +217,25 @@
             return sender;
 
     }
+    
+    public void setSuspect(boolean suspect) {
+        if ( suspect ) 
+            senderState.setSuspect();
+        else 
+            senderState.setReady();
+    }
+    
+    public boolean getSuspect() {
+        return senderState.isFailing() || senderState.isSuspect();
+    }
+    
+    public InetAddress getAddress() {
+        return getHost();
+    }
+    
+    public void setAddress(InetAddress addr) {
+        setHost(addr);
+    }
+
+
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java Thu Mar  2 14:58:03 2006
@@ -95,7 +95,7 @@
     /**
      * wait time for ack
      */
-    private long ackTimeout;
+    private long timeout;
 
     /**
      * waitAckTime
@@ -249,12 +249,12 @@
             this.senderState.setReady();
     }
 
-    public long getAckTimeout() {
-        return ackTimeout;
+    public long getTimeout() {
+        return timeout;
     }
 
-    public void setAckTimeout(long ackTimeout) {
-        this.ackTimeout = ackTimeout;
+    public void setTimeout(long ackTimeout) {
+        this.timeout = ackTimeout;
     }
 
     public long getKeepAliveTimeout() {
@@ -431,7 +431,8 @@
      * Name of this SockerSender
      */
     public String toString() {
-        StringBuffer buf = new StringBuffer("DataSender[");
+        StringBuffer buf = new StringBuffer("DataSender[(");
+        buf.append(super.toString()).append(")");
         buf.append(getAddress()).append(":").append(getPort()).append("]");
         return buf.toString();
     }
@@ -447,7 +448,7 @@
            return ;
        try {
             createSocket();
-            if (getWaitForAck()) socket.setSoTimeout((int) ackTimeout);
+            if (getWaitForAck()) socket.setSoTimeout((int) timeout);
             isSocketConnected = true;
             this.keepAliveCount = 0;
             this.keepAliveConnectTime = System.currentTimeMillis();
@@ -470,6 +471,7 @@
         socket = new Socket(getAddress(), getPort());
         socket.setSendBufferSize(getTxBufSize());
         socket.setReceiveBufferSize(getRxBufSize());
+        socket.setSoTimeout((int)timeout);
         this.socketout = socket.getOutputStream();
     }
 
@@ -573,7 +575,7 @@
         try {
             socketout.write(XByteBuffer.createDataPackage((ClusterData)data));
             socketout.flush();
-            if (getWaitForAck()) waitForAck(ackTimeout);
+            if (getWaitForAck()) waitForAck();
         } finally {
             synchronized(this) {
                 isMessageTransferStarted = false ;
@@ -588,7 +590,7 @@
      * @throws java.io.IOException
      * @throws java.net.SocketTimeoutException
      */
-    protected synchronized void waitForAck(long timeout) throws java.io.IOException {
+    protected synchronized void waitForAck() throws java.io.IOException {
         try {
             boolean ackReceived = false;
             ackbuf.clear();
@@ -609,7 +611,7 @@
                 else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
             }
         } catch (IOException x) {
-            String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.ackTimeout));
+            String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.timeout));
             if ( !this.isSuspect() ) {
                 this.setSuspect(true);
                 if ( log.isWarnEnabled() ) log.warn(errmsg, x);

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar  2 14:58:03 2006
@@ -259,5 +259,9 @@
     public void setConnected(boolean connected) {
         this.connected = connected;
     }
+    
+    public boolean checkKeepAlive() {
+        throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
+    }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org