You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/16 13:47:47 UTC
svn commit: r386320 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp:
DataSender.java bio/BioSender.java bio/MultipointBioSender.java
nio/NioSender.java
Author: fhanik
Date: Thu Mar 16 04:47:46 2006
New Revision: 386320
URL: http://svn.apache.org/viewcvs?rev=386320&view=rev
Log:
implemented keepalive for the nio sockets
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar 16 04:47:46 2006
@@ -39,4 +39,6 @@
public void setTimeout(long timeout);
public void setWaitForAck(boolean isWaitForAck);
public void setKeepAliveCount(int maxRequests);
+ public void setKeepAliveTime(long keepAliveTimeInMs);
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Thu Mar 16 04:47:46 2006
@@ -17,20 +17,20 @@
package org.apache.catalina.tribes.tcp.bio;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.UnknownHostException;
import java.util.Arrays;
+import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.tcp.Constants;
import org.apache.catalina.tribes.tcp.DataSender;
import org.apache.catalina.tribes.tcp.SenderState;
import org.apache.catalina.util.StringManager;
-import java.io.OutputStream;
-import java.io.InputStream;
-import org.apache.catalina.tribes.Member;
-import java.net.UnknownHostException;
/**
* Send cluster messages with only one socket. Ack and keep Alive Handling is
@@ -95,7 +95,7 @@
/**
* keep socket open for no more than one min
*/
- private long keepAliveTimeout = 60 * 1000;
+ private long keepAliveTime = -1;
/**
* max requests before reconnecting (default -1 unlimited)
@@ -196,14 +196,6 @@
this.timeout = ackTimeout;
}
- public long getKeepAliveTimeout() {
- return keepAliveTimeout;
- }
-
- public void setKeepAliveTimeout(long keepAliveTimeout) {
- this.keepAliveTimeout = keepAliveTimeout;
- }
-
public int getKeepAliveCount() {
return keepAliveCount;
}
@@ -270,6 +262,10 @@
this.txBufSize = txBufSize;
}
+ public void setKeepAliveTime(long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
// --------------------------------------------------------- Public Methods
/**
@@ -307,7 +303,7 @@
public boolean keepalive() {
boolean isCloseSocket = true ;
if(isConnected()) {
- if ((keepAliveTimeout > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTimeout)
+ if ((keepAliveTime > -1 && (System.currentTimeMillis() - keepAliveConnectTime) > keepAliveTime)
|| (keepAliveCount > -1 && requestCount >= keepAliveCount)) {
closeSocket();
} else
@@ -444,7 +440,6 @@
keepalive();
if ( reconnect ) closeSocket();
if (!isConnected()) openSocket();
- else if(keepAliveTimeout > -1) this.keepAliveConnectTime = System.currentTimeMillis();
writeData(data);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/MultipointBioSender.java Thu Mar 16 04:47:46 2006
@@ -38,6 +38,7 @@
protected int txBufSize = 25188;
private boolean connected;
private boolean autoConnect;
+ private long keepAliveTime = -1;
public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {
long start = System.currentTimeMillis();
@@ -66,6 +67,7 @@
if (sender == null) {
sender = new BioSender(destination[i], rxBufSize, txBufSize);
sender.setKeepAliveCount(keepAliveCount);
+ sender.setKeepAliveTime(keepAliveTime);
sender.setTimeout(timeout);
//sender.setResend();
//sender.setKeepAliveTimeout();
@@ -135,6 +137,10 @@
return autoConnect;
}
+ public long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
public void setUseDirectBuffer(boolean directBuf) {
this.directBuf = directBuf;
}
@@ -169,6 +175,10 @@
public void setKeepAliveCount(int keepAliveCount) {
this.keepAliveCount = keepAliveCount;
+ }
+
+ public void setKeepAliveTime(long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
}
public boolean keepalive() {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java?rev=386320&r1=386319&r2=386320&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioSender.java Thu Mar 16 04:47:46 2006
@@ -69,7 +69,10 @@
protected int remaining = 0;
protected boolean complete;
protected int attempt;
- protected int keepAliveCount;
+ protected int keepAliveCount = -1;
+ protected int requestCount = 0;
+ protected long connectTime;
+ protected long keepAliveTime = -1;
protected long timeout;
public NioSender(Member destination) {
@@ -90,6 +93,8 @@
if ( socketChannel.finishConnect() ) {
//we connected, register ourselves for writing
connected = true;
+ requestCount = 0;
+ connectTime = System.currentTimeMillis();
socketChannel.socket().setSendBufferSize(txBufSize);
socketChannel.socket().setReceiveBufferSize(rxBufSize);
socketChannel.socket().setSoTimeout((int)timeout);
@@ -99,32 +104,39 @@
//wait for the connection to finish
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
return false;
- }
+ }//end if
} else if ( key.isWritable() ) {
boolean writecomplete = write(key);
if ( writecomplete ) {
//we are completed, should we read an ack?
- if ( waitForAck ) key.interestOps(key.interestOps()|SelectionKey.OP_READ);
- //if not, we are ready, setMessage will reregister us for another write interest
- else {
+ if ( waitForAck ) {
+ //register to read the ack
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+ } else {
+ //if not, we are ready, setMessage will reregister us for another write interest
//do a health check, we have no way of verify a disconnected
//socket since we don't register for OP_READ on waitForAck=false
read(key);//this causes overhead.
+ requestCount++;
return true;
}
} else {
//we are not complete, lets write some more
key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
- }
+ }//end if
} else if ( key.isReadable() ) {
boolean readcomplete = read(key);
- if ( readcomplete ) return true;
- else key.interestOps(key.interestOps()|SelectionKey.OP_READ);
+ if ( readcomplete ) {
+ requestCount++;
+ return true;
+ } else {
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+ }//end if
} else {
//unknown state, should never happen
log.warn("Data is in unknown state. readyOps="+ops);
throw new IOException("Data is in unknown state. readyOps="+ops);
- }
+ }//end if
return false;
}
@@ -193,7 +205,6 @@
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(addr);
-
socketChannel.register(getSelector(),SelectionKey.OP_CONNECT,this);
}
@@ -236,6 +247,8 @@
remaining = 0;
complete = false;
attempt = 0;
+ requestCount = 0;
+ connectTime = -1;
}
private ByteBuffer getReadBuffer() {
@@ -273,7 +286,11 @@
* @todo Implement this org.apache.catalina.tribes.tcp.IDataSender method
*/
public boolean keepalive() {
- return false;
+ boolean disconnect = false;
+ if ( keepAliveCount >= 0 && requestCount>keepAliveCount ) disconnect = true;
+ else if ( keepAliveTime >= 0 && keepAliveTime> (System.currentTimeMillis()-connectTime) ) disconnect = true;
+ if ( disconnect ) disconnect();
+ return disconnect;
}
/**
* isConnected
@@ -370,5 +387,9 @@
public void setTimeout(long timeout) {
this.timeout = timeout;
+ }
+
+ public void setKeepAliveTime(long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org