You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/16 13:47:47 UTC

svn commit: r386320 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp: DataSender.java bio/BioSender.java bio/MultipointBioSender.java nio/NioSender.java

Author: fhanik
Date: Thu Mar 16 04:47:46 2006
New Revision: 386320

URL: http://svn.apache.org/viewcvs?rev=386320&view=rev
Log:
implemented keepalive for the nio sockets

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar 16 04:47:46 2006
@@ -39,4 +39,6 @@
     public void setTimeout(long timeout);
     public void setWaitForAck(boolean isWaitForAck);
     public void setKeepAliveCount(int maxRequests);
+    public void setKeepAliveTime(long keepAliveTimeInMs);
+    
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Thu Mar 16 04:47:46 2006
@@ -17,20 +17,20 @@
 package org.apache.catalina.tribes.tcp.bio;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 
+import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.tcp.Constants;
 import org.apache.catalina.tribes.tcp.DataSender;
 import org.apache.catalina.tribes.tcp.SenderState;
 import org.apache.catalina.util.StringManager;
-import java.io.OutputStream;
-import java.io.InputStream;
-import org.apache.catalina.tribes.Member;
-import java.net.UnknownHostException;
 
 /**
  * Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -95,7 +95,7 @@
     /**
      * keep socket open for no more than one min
      */
-    private long keepAliveTimeout = 60 * 1000;
+    private long keepAliveTime = -1;
 
     /**
      * max requests before reconnecting (default -1 unlimited)
@@ -196,14 +196,6 @@
         this.timeout = ackTimeout;
     }
 
-    public long getKeepAliveTimeout() {
-        return keepAliveTimeout;
-    }
-
-    public void setKeepAliveTimeout(long keepAliveTimeout) {
-        this.keepAliveTimeout = keepAliveTimeout;
-    }
-
     public int getKeepAliveCount() {
         return keepAliveCount;
     }
@@ -270,6 +262,10 @@
         this.txBufSize = txBufSize;
     }
 
+    public void setKeepAliveTime(long keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
+    }
+
     // --------------------------------------------------------- Public Methods
 
     /**
@@ -307,7 +303,7 @@
     public  boolean keepalive() {
         boolean isCloseSocket = true ;
         if(isConnected()) {
-            if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout)
+            if ((keepAliveTime > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTime)
                 || (keepAliveCount > -1 && requestCount >= keepAliveCount)) {
                     closeSocket();
            } else
@@ -444,7 +440,6 @@
         keepalive();
         if ( reconnect ) closeSocket();
         if (!isConnected()) openSocket();
-        else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis();
         writeData(data);
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Thu Mar 16 04:47:46 2006
@@ -38,6 +38,7 @@
     protected int txBufSize = 25188;
     private boolean connected;
     private boolean autoConnect;
+    private long keepAliveTime = -1;
 
     public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
         long start = System.currentTimeMillis();
@@ -66,6 +67,7 @@
                 if (sender == null) {
                     sender = new BioSender(destination[i], rxBufSize, txBufSize);
                     sender.setKeepAliveCount(keepAliveCount);
+                    sender.setKeepAliveTime(keepAliveTime);
                     sender.setTimeout(timeout);
                     //sender.setResend();
                     //sender.setKeepAliveTimeout();
@@ -135,6 +137,10 @@
         return autoConnect;
     }
 
+    public long getKeepAliveTime() {
+        return keepAliveTime;
+    }
+
     public void setUseDirectBuffer(boolean directBuf) {
         this.directBuf = directBuf;
     }
@@ -169,6 +175,10 @@
 
     public void setKeepAliveCount(int keepAliveCount) {
         this.keepAliveCount = keepAliveCount;
+    }
+
+    public void setKeepAliveTime(long keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
     }
 
     public boolean keepalive() {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Thu Mar 16 04:47:46 2006
@@ -69,7 +69,10 @@
     protected int remaining = 0;
     protected boolean complete;
     protected int attempt;
-    protected int keepAliveCount;
+    protected int keepAliveCount = -1;
+    protected int requestCount = 0;
+    protected long connectTime;
+    protected long keepAliveTime = -1;
     protected long timeout;
 
     public NioSender(Member destination) {
@@ -90,6 +93,8 @@
             if ( socketChannel.finishConnect() ) {
                 //we connected, register ourselves for writing
                 connected = true;
+                requestCount = 0;
+                connectTime = System.currentTimeMillis();
                 socketChannel.socket().setSendBufferSize(txBufSize);
                 socketChannel.socket().setReceiveBufferSize(rxBufSize);
                 socketChannel.socket().setSoTimeout((int)timeout);
@@ -99,32 +104,39 @@
                 //wait for the connection to finish
                 key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
                 return false;
-            }
+            }//end if
         } else if ( key.isWritable() ) {
             boolean writecomplete = write(key);
             if ( writecomplete ) {
                 //we are completed, should we read an ack?
-                if ( waitForAck ) key.interestOps(key.interestOps()|SelectionKey.OP_READ);
-                //if not, we are ready, setMessage will reregister us for another write interest
-                else {
+                if ( waitForAck ) {
+                    //register to read the ack
+                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+                } else {
+                    //if not, we are ready, setMessage will reregister us for another write interest
                     //do a health check, we have no way of verify a disconnected
                     //socket since we don't register for OP_READ on waitForAck=false
                     read(key);//this causes overhead.
+                    requestCount++;
                     return true;
                 }
             } else {
                 //we are not complete, lets write some more
                 key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
-            }
+            }//end if
         } else if ( key.isReadable() ) {
             boolean readcomplete = read(key);
-            if ( readcomplete ) return true;
-            else key.interestOps(key.interestOps()|SelectionKey.OP_READ);
+            if ( readcomplete ) {
+                requestCount++;
+                return true;
+            } else {
+                key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+            }//end if
         } else {
             //unknown state, should never happen
             log.warn("Data is in unknown state. readyOps="+ops);
             throw new IOException("Data is in unknown state. readyOps="+ops);
-        }
+        }//end if
         return false;
     }
     
@@ -193,7 +205,6 @@
         socketChannel = SocketChannel.open();
         socketChannel.configureBlocking(false);
         socketChannel.connect(addr);
-        
         socketChannel.register(getSelector(),SelectionKey.OP_CONNECT,this);
     }
     
@@ -236,6 +247,8 @@
         remaining = 0;
         complete = false;
         attempt = 0;
+        requestCount = 0;
+        connectTime = -1;
     }
 
     private ByteBuffer getReadBuffer() {
@@ -273,7 +286,11 @@
      * @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
      */
     public boolean keepalive() {
-        return false;
+        boolean disconnect = false;
+        if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect = true;
+        else if ( keepAliveTime >= 0 && keepAliveTime> (System.currentTimeMillis()-connectTime) ) disconnect = true;
+        if ( disconnect ) disconnect();
+        return disconnect;
     }
     /**
      * isConnected
@@ -370,5 +387,9 @@
 
     public void setTimeout(long timeout) {
         this.timeout = timeout;
+    }
+
+    public void setKeepAliveTime(long keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org