You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/01 21:53:17 UTC
svn commit: r418503 - in
/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes:
io/ transport/ transport/bio/ transport/nio/
Author: fhanik
Date: Sat Jul 1 12:53:16 2006
New Revision: 418503
URL: http://svn.apache.org/viewvc?rev=418503&view=rev
Log:
Improved NioReceiver by almost 50% in performance, handles concurrency much better now
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Sat Jul 1 12:53:16 2006
@@ -215,6 +215,30 @@
* @param b byte[]
* @return ChannelData
*/
+ public static ChannelData getDataFromPackage(XByteBuffer xbuf) {
+ ChannelData data = new ChannelData(false);
+ int offset = 0;
+ data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset));
+ offset += 4; //options
+ data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset));
+ offset += 8; //timestamp
+ data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+ offset += 4; //uniqueId length
+ System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length);
+ offset += data.uniqueId.length; //uniqueId data
+ byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+ offset += 4; //addr length
+ System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length);
+ data.setAddress(MemberImpl.getMember(addr));
+ offset += addr.length; //addr data
+ int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);
+ System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);
+ xbuf.setLength(xsize);
+ data.message = xbuf;
+ return data;
+
+ }
+
public static ChannelData getDataFromPackage(byte[] b) {
ChannelData data = new ChannelData(false);
int offset = 0;
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Sat Jul 1 12:53:16 2006
@@ -113,6 +113,11 @@
public int getLength() {
return bufSize;
}
+
+ public void setLength(int size) {
+ if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");
+ bufSize = size;
+ }
public void trim(int length) {
if ( (bufSize - length) < 0 )
@@ -307,24 +312,24 @@
* @param clearFromBuffer - if true, the package will be removed from the byte buffer
* @return - returns the actual message bytes (header, compress,size and footer not included).
*/
- public byte[] extractDataPackage(boolean clearFromBuffer) {
+ public XByteBuffer extractDataPackage(boolean clearFromBuffer) {
int psize = countPackages(true);
if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
int size = toInt(buf, START_DATA.length);
- byte[] data = new byte[size];
- System.arraycopy(buf, START_DATA.length + 4, data, 0, size);
+ XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
+ System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
if (clearFromBuffer) {
int totalsize = START_DATA.length + 4 + size + END_DATA.length;
bufSize = bufSize - totalsize;
System.arraycopy(buf, totalsize, buf, 0, bufSize);
}
- return data;
+ return xbuf;
}
public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {
- byte[] data = extractDataPackage(clearFromBuffer);
- ChannelData cdata = ChannelData.getDataFromPackage(data);
+ XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
+ ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
return cdata;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java Sat Jul 1 12:53:16 2006
@@ -54,7 +54,7 @@
private boolean listen = false;
private ThreadPool pool;
private boolean direct = true;
- private long tcpSelectorTimeout = 100;
+ private long tcpSelectorTimeout = 5000;
//how many times to search for an available socket
private int autoBind = 10;
private int maxThreads = 25;
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java Sat Jul 1 12:53:16 2006
@@ -54,7 +54,7 @@
*/
public void start() throws IOException {
try {
- setPool(new ThreadPool(new Object(),getMaxThreads(),getMinThreads(),this));
+ setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
log.fatal("ThreadPool can initilzed. Listener not started", x);
if ( x instanceof IOException ) throw (IOException)x;
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java Sat Jul 1 12:53:16 2006
@@ -275,7 +275,7 @@
byte d = (byte)i;
ackbuf.append(d);
if (ackbuf.doesPackageExist() ) {
- byte[] ackcmd = ackbuf.extractDataPackage(true);
+ byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
ackReceived = ackReceived || failAckReceived;
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul 1 12:53:16 2006
@@ -33,6 +33,7 @@
import org.apache.catalina.tribes.transport.ThreadPool;
import org.apache.catalina.tribes.transport.WorkerThread;
import org.apache.catalina.tribes.util.StringManager;
+import java.util.LinkedList;
/**
* @author Filip Hanik
@@ -55,8 +56,8 @@
private Selector selector = null;
private ServerSocketChannel serverChannel = null;
-
- private Object interestOpsMutex = new Object();
+ protected LinkedList events = new LinkedList();
+// private Object interestOpsMutex = new Object();
public NioReceiver() {
}
@@ -70,9 +71,9 @@
return (info);
}
- public Object getInterestOpsMutex() {
- return interestOpsMutex;
- }
+// public Object getInterestOpsMutex() {
+// return interestOpsMutex;
+// }
public void stop() {
this.stopListening();
@@ -85,7 +86,8 @@
*/
public void start() throws IOException {
try {
- setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
+// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
+ setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
log.fatal("ThreadPool can initilzed. Listener not started", x);
if ( x instanceof IOException ) throw (IOException)x;
@@ -105,7 +107,7 @@
}
public WorkerThread getWorkerThread() {
- NioReplicationThread thread = new NioReplicationThread(this);
+ NioReplicationThread thread = new NioReplicationThread(this,this);
thread.setUseBufferPool(this.getUseBufferPool());
thread.setRxBufSize(getRxBufSize());
thread.setOptions(getWorkerThreadOptions());
@@ -130,6 +132,31 @@
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
+
+ public void addEvent(Runnable event) {
+ if ( selector != null ) {
+ synchronized (events) {
+ events.add(event);
+ }
+ selector.wakeup();
+ }
+ }
+
+ public void events() {
+ if ( events.size() == 0 ) return;
+ synchronized (events) {
+ Runnable r = null;
+ while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
+ try {
+ r.run();
+ } catch ( Exception x ) {
+ log.error("",x);
+ }
+ }
+ events.clear();
+ }
+ }
+
/**
* get data from channel and store in byte array
* send it to cluster
@@ -148,7 +175,7 @@
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
try {
-
+ events();
int n = selector.select(getTcpSelectorTimeout());
if (n == 0) {
//there is a good chance that we got here
@@ -156,11 +183,11 @@
//selector wakeup().
//if that happens, we must ensure that that
//thread has enough time to call interestOps
- synchronized (interestOpsMutex) {
+// synchronized (interestOpsMutex) {
//if we got the lock, means there are no
//keys trying to register for the
//interestOps method
- }
+// }
continue; // nothing to do
}
// get an iterator over the set of selected keys
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul 1 12:53:16 2006
@@ -49,9 +49,11 @@
private ByteBuffer buffer = null;
private SelectionKey key;
private int rxBufSize;
- public NioReplicationThread (ListenCallback callback)
+ private NioReceiver receiver;
+ public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
{
super(callback);
+ this.receiver = receiver;
}
// loop forever waiting for work to do
@@ -131,7 +133,7 @@
* re-enables OP_READ and calls wakeup() on the selector
* so the selector will resume watching this channel.
*/
- protected void drainChannel (SelectionKey key) throws Exception {
+ protected void drainChannel (final SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear(); // make buffer empty
@@ -188,21 +190,25 @@
}
//acquire the interestOps mutex
- Object mutex = this.getPool().getInterestOpsMutex();
- synchronized (mutex) {
- try {
- if ( key.isValid() ) {
- // cycle the selector so this key is active again
- key.selector().wakeup();
- // resume interest in OP_READ, OP_WRITE
- int resumeOps = key.interestOps() | SelectionKey.OP_READ;
- key.interestOps(resumeOps);
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ if (key.isValid()) {
+ // cycle the selector so this key is active again
+ key.selector().wakeup();
+ // resume interest in OP_READ, OP_WRITE
+ int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+ key.interestOps(resumeOps);
+ }
+ } catch (Exception x) {
+ try {
+ key.selector().close();
+ } catch (Exception ignore) {}
+ log.error("Unable to cycle the selector, connection disconnected?", x);
}
- }catch ( Exception x ) {
- try {key.selector().close();}catch ( Exception ignore){}
- log.error("Unable to cycle the selector, connection disconnected?",x);
}
- }
+ };
+ receiver.addEvent(r);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Sat Jul 1 12:53:16 2006
@@ -156,7 +156,7 @@
ackbuf.append(readbuf,read);
readbuf.clear();
if (ackbuf.doesPackageExist() ) {
- byte[] ackcmd = ackbuf.extractDataPackage(true);
+ byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
if ( fack && getThrowOnFailedAck() ) throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org