You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/02 23:58:05 UTC
svn commit: r382542 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
mcast/ tcp/ tcp/bio/ tcp/nio/
Author: fhanik
Date: Thu Mar 2 14:58:03 2006
New Revision: 382542
URL: http://svn.apache.org/viewcvs?rev=382542&view=rev
Log:
Optimized serialization even further.
Implented an abstract sender pool class
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Thu Mar 2 14:58:03 2006
@@ -81,6 +81,8 @@
* For the local member only
*/
protected transient long serviceStartTime;
+
+ protected transient byte[] dataPkg = null;
/**
* Empty constructor for serialization
@@ -136,6 +138,14 @@
* @throws Exception
*/
public byte[] getData() {
+ long alive=System.currentTimeMillis()-getServiceStartTime();
+
+ //look in cache first
+ if ( dataPkg!=null ) {
+ XByteBuffer.toBytes((long)alive,dataPkg,0);
+ return dataPkg;
+ }
+
//package looks like
//alive - 8 bytes
//port - 4 bytes
@@ -145,7 +155,7 @@
byte[] domaind = this.domain;
byte[] addr = host;
byte[] data = new byte[8+4+addr.length+4+domaind.length];
- long alive=System.currentTimeMillis()-getServiceStartTime();
+
//reduce byte copying
//System.arraycopy(XByteBuffer.toBytes((long)alive),0,data,0,8);
@@ -162,6 +172,7 @@
XByteBuffer.toBytes(domaind.length,data,16);
System.arraycopy(domaind,0,data,20,domaind.length);
+ dataPkg = data;
return data;
}
/**
@@ -369,9 +380,11 @@
public void setDomain(String domain) {
this.domain = domain.getBytes();
+ this.dataPkg = null;
}
public void setPort(int port) {
this.port = port;
+ this.dataPkg = null;
}
public void setServiceStartTime(long serviceStartTime) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSender.java Thu Mar 2 14:58:03 2006
@@ -35,5 +35,10 @@
public boolean isConnected();
public void setRxBufSize(int size);
public void setTxBufSize(int size);
+ public boolean checkKeepAlive();
+ public void setTimeout(long timeout);
+ public void setWaitForAck(boolean isWaitForAck);
+
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/DataSenders.properties Thu Mar 2 14:58:03 2006
@@ -1,4 +1,4 @@
fastasyncqueue=org.apache.catalina.tribes.tcp.bio.FastAsyncSocketSender
synchronous=org.apache.catalina.tribes.tcp.bio.SocketSender
pooled=org.apache.catalina.tribes.tcp.bio.PooledSocketSender
-parallel=org.apache.catalina.tribes.tcp.nio.PooledNioSender
\ No newline at end of file
+parallel=org.apache.catalina.tribes.tcp.nio.PooledParallelSender
\ No newline at end of file
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationListener.java Thu Mar 2 14:58:03 2006
@@ -31,9 +31,8 @@
import org.apache.catalina.tribes.MessageListener;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ObjectReader;
-import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.tcp.nio.TcpReplicationThread;
import org.apache.catalina.util.StringManager;
-import org.apache.catalina.tribes.tcp.nio.*;
/**
* @author Filip Hanik
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java Thu Mar 2 14:58:03 2006
@@ -154,7 +154,7 @@
/**
* @return The ack timeout
*/
- public long getAckTimeout() {
+ public long getTimeout() {
return ackTimeout;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/SinglePointSender.java Thu Mar 2 14:58:03 2006
@@ -33,17 +33,9 @@
public void setPort(int port);
public int getPort();
public void sendMessage(ChannelMessage data) throws ChannelException;
- public boolean isConnected();
public void setSuspect(boolean suspect);
public boolean getSuspect();
- public void setAckTimeout(long timeout);
- public long getAckTimeout();
- public boolean getWaitForAck();
- public void setWaitForAck(boolean isWaitForAck);
- public boolean checkKeepAlive();
public String getDomain() ;
public void setDomain(String domain) ;
- public void setRxBufSize(int size);
- public void setTxBufSize(int size);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/PooledSocketSender.java Thu Mar 2 14:58:03 2006
@@ -25,6 +25,8 @@
import org.apache.catalina.tribes.tcp.PooledSender;
import org.apache.catalina.tribes.tcp.DataSender;
import org.apache.catalina.tribes.tcp.SenderState;
+import org.apache.catalina.tribes.tcp.SinglePointSender;
+import java.net.Inet4Address;
/**
* Send cluster messages with a pool of sockets (25).
@@ -34,7 +36,7 @@
* @version 1.2
*/
-public class PooledSocketSender extends PooledSender {
+public class PooledSocketSender extends PooledSender implements SinglePointSender {
private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
.getLog(org.apache.catalina.tribes.tcp.bio.PooledSocketSender.class);
@@ -50,9 +52,9 @@
private String domain;
private InetAddress host;
private int port;
- private SenderState senderState;
- private int keepAliveMaxRequestCount;
- private long keepAliveTimeout;
+ private SenderState senderState = new SenderState(SenderState.READY);
+ private int keepAliveMaxRequestCount = -1;
+ private long keepAliveTimeout = 1000*60;
private long ackTimeout;
private boolean resend;
private boolean waitForAck;
@@ -64,6 +66,10 @@
* @param host replication node tcp address
* @param port replication node tcp port
*/
+ public PooledSocketSender(String domain,InetAddress host, int port) {
+ this(domain,host,port,25);
+ }
+
public PooledSocketSender(String domain,InetAddress host, int port, int poolSize) {
super(poolSize);
this.host = host;
@@ -109,7 +115,7 @@
this.keepAliveTimeout = keepAliveTimeout;
}
- public void setAckTimeout(long ackTimeout) {
+ public void setTimeout(long ackTimeout) {
this.ackTimeout = ackTimeout;
}
@@ -145,7 +151,7 @@
return keepAliveTimeout;
}
- public long getAckTimeout() {
+ public long getTimeout() {
return ackTimeout;
}
@@ -172,8 +178,7 @@
//get a socket sender from the pool
if(!isConnected()) {
synchronized(this) {
- if(!isConnected())
- connect();
+ if(!isConnected()) connect();
}
}
SinglePointDataSender sender = (SinglePointDataSender)getSender();
@@ -204,7 +209,7 @@
getSenderState() );
sender.setKeepAliveMaxRequestCount(getKeepAliveMaxRequestCount());
sender.setKeepAliveTimeout(getKeepAliveTimeout());
- sender.setAckTimeout(getAckTimeout());
+ sender.setTimeout(getTimeout());
sender.setWaitForAck(getWaitForAck());
sender.setResend(isResend());
sender.setRxBufSize(getRxBufSize());
@@ -212,4 +217,25 @@
return sender;
}
+
+ public void setSuspect(boolean suspect) {
+ if ( suspect )
+ senderState.setSuspect();
+ else
+ senderState.setReady();
+ }
+
+ public boolean getSuspect() {
+ return senderState.isFailing() || senderState.isSuspect();
+ }
+
+ public InetAddress getAddress() {
+ return getHost();
+ }
+
+ public void setAddress(InetAddress addr) {
+ setHost(addr);
+ }
+
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/SinglePointDataSender.java Thu Mar 2 14:58:03 2006
@@ -95,7 +95,7 @@
/**
* wait time for ack
*/
- private long ackTimeout;
+ private long timeout;
/**
* waitAckTime
@@ -249,12 +249,12 @@
this.senderState.setReady();
}
- public long getAckTimeout() {
- return ackTimeout;
+ public long getTimeout() {
+ return timeout;
}
- public void setAckTimeout(long ackTimeout) {
- this.ackTimeout = ackTimeout;
+ public void setTimeout(long ackTimeout) {
+ this.timeout = ackTimeout;
}
public long getKeepAliveTimeout() {
@@ -431,7 +431,8 @@
* Name of this SockerSender
*/
public String toString() {
- StringBuffer buf = new StringBuffer("DataSender[");
+ StringBuffer buf = new StringBuffer("DataSender[(");
+ buf.append(super.toString()).append(")");
buf.append(getAddress()).append(":").append(getPort()).append("]");
return buf.toString();
}
@@ -447,7 +448,7 @@
return ;
try {
createSocket();
- if (getWaitForAck()) socket.setSoTimeout((int) ackTimeout);
+ if (getWaitForAck()) socket.setSoTimeout((int) timeout);
isSocketConnected = true;
this.keepAliveCount = 0;
this.keepAliveConnectTime = System.currentTimeMillis();
@@ -470,6 +471,7 @@
socket = new Socket(getAddress(), getPort());
socket.setSendBufferSize(getTxBufSize());
socket.setReceiveBufferSize(getRxBufSize());
+ socket.setSoTimeout((int)timeout);
this.socketout = socket.getOutputStream();
}
@@ -573,7 +575,7 @@
try {
socketout.write(XByteBuffer.createDataPackage((ClusterData)data));
socketout.flush();
- if (getWaitForAck()) waitForAck(ackTimeout);
+ if (getWaitForAck()) waitForAck();
} finally {
synchronized(this) {
isMessageTransferStarted = false ;
@@ -588,7 +590,7 @@
* @throws java.io.IOException
* @throws java.net.SocketTimeoutException
*/
- protected synchronized void waitForAck(long timeout) throws java.io.IOException {
+ protected synchronized void waitForAck() throws java.io.IOException {
try {
boolean ackReceived = false;
ackbuf.clear();
@@ -609,7 +611,7 @@
else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
}
} catch (IOException x) {
- String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.ackTimeout));
+ String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(this.timeout));
if ( !this.isSuspect() ) {
this.setSuspect(true);
if ( log.isWarnEnabled() ) log.warn(errmsg, x);
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java?rev=382542&r1=382541&r2=382542&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/ParallelNioSender.java Thu Mar 2 14:58:03 2006
@@ -259,5 +259,9 @@
public void setConnected(boolean connected) {
this.connected = connected;
}
+
+ public boolean checkKeepAlive() {
+ throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org