You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2003/12/19 22:22:14 UTC
cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java SocketSender.java TcpReplicationThread.java
fhanik 2003/12/19 13:22:14
Modified: modules/cluster/src/share/org/apache/catalina/cluster/io
Jdk13ObjectReader.java ObjectReader.java
XByteBuffer.java
modules/cluster/src/share/org/apache/catalina/cluster/tcp
ReplicationTransmitter.java SocketSender.java
TcpReplicationThread.java
Log:
implemented compression of the data sent over the wire
Revision Changes Path
1.2 +5 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java
Index: Jdk13ObjectReader.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/Jdk13ObjectReader.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Jdk13ObjectReader.java 18 Dec 2003 04:20:14 -0000 1.1
+++ Jdk13ObjectReader.java 19 Dec 2003 21:22:13 -0000 1.2
@@ -93,7 +93,7 @@
this.buffer = new XByteBuffer();
}
- public int append(byte[] data,int off,int len) {
+ public int append(byte[] data,int off,int len) throws java.io.IOException {
boolean result = false;
buffer.append(data,off,len);
int pkgCnt = 0;
@@ -107,7 +107,7 @@
return pkgCnt;
}
- public int execute() {
+ public int execute() throws java.io.IOException {
return append(new byte[0],0,0);
}
1.3 +6 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java
Index: ObjectReader.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/ObjectReader.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ObjectReader.java 18 Dec 2003 04:20:14 -0000 1.2
+++ ObjectReader.java 19 Dec 2003 21:22:13 -0000 1.3
@@ -102,7 +102,7 @@
return this.channel;
}
- public int append(byte[] data,int off,int len) {
+ public int append(byte[] data,int off,int len) throws java.io.IOException {
boolean result = false;
buffer.append(data,off,len);
int pkgCnt = 0;
@@ -116,7 +116,7 @@
return pkgCnt;
}
- public int execute() {
+ public int execute() throws java.io.IOException {
return append(new byte[0],0,0);
}
1.3 +28 -9 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java
Index: XByteBuffer.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/io/XByteBuffer.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- XByteBuffer.java 16 Nov 2003 22:22:45 -0000 1.2
+++ XByteBuffer.java 19 Dec 2003 21:22:13 -0000 1.3
@@ -214,16 +214,29 @@
* @param clearFromBuffer - if true, the package will be removed from the byte buffer
* @return - returns the actual message bytes (header, size and footer not included).
*/
- public synchronized byte[] extractPackage(boolean clearFromBuffer) {
+ public synchronized byte[] extractPackage(boolean clearFromBuffer) throws java.io.IOException {
int size = packageExists();
if ( size == 0 ) throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
- byte[] result = new byte[size];
- System.arraycopy(buf,START_DATA.length+4,result,0,size);
+ byte[] data = new byte[size];
+ System.arraycopy(buf,START_DATA.length+4,data,0,size);
if ( clearFromBuffer ) {
int totalsize = START_DATA.length + 4 + size + END_DATA.length;
bufSize = bufSize - totalsize;
System.arraycopy(buf, totalsize, buf, 0, bufSize);
}
+ java.io.ByteArrayInputStream bin = new java.io.ByteArrayInputStream(data);
+ java.util.zip.GZIPInputStream gin = new java.util.zip.GZIPInputStream(bin);
+ byte[] tmp = new byte[1024];
+ byte[] result = new byte[0];
+ int length = gin.read(tmp);
+ while ( length > 0 ) {
+ byte[] tmpdata = result;
+ result = new byte[result.length+length];
+ System.arraycopy(tmpdata,0,result,0,tmpdata.length);
+ System.arraycopy(tmp,0,result,tmpdata.length,length);
+ length = gin.read(tmp);
+ }
+ gin.close();
return result;
}//extractPackage
@@ -354,7 +367,13 @@
* @param data - the message data to be contained within the package
* @return - a full package (header,size,data,footer)
*/
- public static byte[] createDataPackage(byte[] data) {
+ public static byte[] createDataPackage(byte[] indata) throws java.io.IOException {
+ java.io.ByteArrayOutputStream bout = new java.io.ByteArrayOutputStream(indata.length/2);
+ java.util.zip.GZIPOutputStream gout = new java.util.zip.GZIPOutputStream(bout);
+ gout.write(indata);
+ gout.flush();
+ gout.close();
+ byte[] data = bout.toByteArray();
byte[] result = new byte[START_DATA.length+4+data.length+END_DATA.length];
System.arraycopy(START_DATA,0,result,0,START_DATA.length);
System.arraycopy(toBytes(data.length),0,result,START_DATA.length,4);
@@ -378,4 +397,4 @@
System.out.println("After=" + toLong(d, 0));
}
-}//class
\ No newline at end of file
+}//class
1.9 +16 -3 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
Index: ReplicationTransmitter.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ReplicationTransmitter.java 18 Dec 2003 04:20:15 -0000 1.8
+++ ReplicationTransmitter.java 19 Dec 2003 21:22:13 -0000 1.9
@@ -77,6 +77,18 @@
for ( int i=0; i<senders.length; i++)
map.put(senders[i].getAddress().getHostAddress()+":"+senders[i].getPort(),senders[i]);
}
+
+ private static long nrOfRequests = 0;
+ private static long totalBytes = 0;
+ private static synchronized void addStats(int length) {
+ nrOfRequests++;
+ totalBytes+=length;
+ if ( (nrOfRequests % 100) == 0 ) {
+ log.info("Nr of bytes sent="+totalBytes+" over "+nrOfRequests+" =="+(totalBytes/nrOfRequests)+" bytes/request");
+ }
+
+ }
+
public synchronized void add(IDataSender sender)
{
String key = sender.getAddress().getHostAddress()+":"+sender.getPort();
@@ -132,6 +144,7 @@
if (!sender.isConnected())
sender.connect();
sender.sendMessage(sessionId,data);
+ addStats(data.length);
sender.setSuspect(false);
}catch ( Exception x)
{
1.7 +4 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java
Index: SocketSender.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SocketSender.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SocketSender.java 18 Dec 2003 04:20:15 -0000 1.6
+++ SocketSender.java 19 Dec 2003 21:22:13 -0000 1.7
@@ -108,7 +108,7 @@
public void connect() throws java.io.IOException
{
sc = new Socket(getAddress(),getPort());
- sc.setSoTimeout((int)ackTimeout);
+ //sc.setSoTimeout((int)ackTimeout);
isSocketConnected = true;
this.keepAliveCount = 0;
this.keepAliveConnectTime = System.currentTimeMillis();
1.4 +4 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java
Index: TcpReplicationThread.java
===================================================================
RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/TcpReplicationThread.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- TcpReplicationThread.java 18 Dec 2003 04:20:15 -0000 1.3
+++ TcpReplicationThread.java 19 Dec 2003 21:22:13 -0000 1.4
@@ -197,8 +197,6 @@
private void sendAck(SelectionKey key, SocketChannel channel) throws java.io.IOException {
//send a reply-acknowledgement
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(new byte[] {6,2,3});
- channel.write(buf);
- buf.clear();
+ channel.write(ByteBuffer.wrap(new byte[] {6,2,3}));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org