You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/01 21:53:17 UTC

svn commit: r418503 - in /tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes: io/ transport/ transport/bio/ transport/nio/

Author: fhanik
Date: Sat Jul  1 12:53:16 2006
New Revision: 418503

URL: http://svn.apache.org/viewvc?rev=418503&view=rev
Log:
Improved NioReceiver by almost 50% in performance, handles concurrency much better now

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Sat Jul  1 12:53:16 2006
@@ -215,6 +215,30 @@
      * @param b byte[]
      * @return ChannelData
      */
+    public static ChannelData getDataFromPackage(XByteBuffer xbuf)  {
+        ChannelData data = new ChannelData(false);
+        int offset = 0;
+        data.setOptions(XByteBuffer.toInt(xbuf.getBytesDirect(),offset));
+        offset += 4; //options
+        data.setTimestamp(XByteBuffer.toLong(xbuf.getBytesDirect(),offset));
+        offset += 8; //timestamp
+        data.uniqueId = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+        offset += 4; //uniqueId length
+        System.arraycopy(xbuf.getBytesDirect(),offset,data.uniqueId,0,data.uniqueId.length);
+        offset += data.uniqueId.length; //uniqueId data
+        byte[] addr = new byte[XByteBuffer.toInt(xbuf.getBytesDirect(),offset)];
+        offset += 4; //addr length
+        System.arraycopy(xbuf.getBytesDirect(),offset,addr,0,addr.length);
+        data.setAddress(MemberImpl.getMember(addr));
+        offset += addr.length; //addr data
+        int xsize = XByteBuffer.toInt(xbuf.getBytesDirect(),offset);
+        System.arraycopy(xbuf.getBytesDirect(),offset,xbuf.getBytesDirect(),0,xsize);
+        xbuf.setLength(xsize);
+        data.message = xbuf;
+        return data;
+
+    }
+
     public static ChannelData getDataFromPackage(byte[] b)  {
         ChannelData data = new ChannelData(false);
         int offset = 0;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java Sat Jul  1 12:53:16 2006
@@ -113,6 +113,11 @@
     public int getLength() {
         return bufSize;
     }
+
+    public void setLength(int size) {
+        if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");
+        bufSize = size;
+    }
     
     public void trim(int length) {
         if ( (bufSize - length) < 0 ) 
@@ -307,24 +312,24 @@
      * @param clearFromBuffer - if true, the package will be removed from the byte buffer
      * @return - returns the actual message bytes (header, compress,size and footer not included).
      */
-    public byte[] extractDataPackage(boolean clearFromBuffer) {
+    public XByteBuffer extractDataPackage(boolean clearFromBuffer) {
         int psize = countPackages(true);
         if (psize == 0) throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
         int size = toInt(buf, START_DATA.length);
-        byte[] data = new byte[size];
-        System.arraycopy(buf, START_DATA.length + 4, data, 0, size);
+        XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
+        System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
         if (clearFromBuffer) {
             int totalsize = START_DATA.length + 4 + size + END_DATA.length;
             bufSize = bufSize - totalsize;
             System.arraycopy(buf, totalsize, buf, 0, bufSize);
         }
-        return data;
+        return xbuf;
 
     }
     
     public ChannelData extractPackage(boolean clearFromBuffer) throws java.io.IOException {
-        byte[] data = extractDataPackage(clearFromBuffer);
-        ChannelData cdata = ChannelData.getDataFromPackage(data);
+        XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
+        ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
         return cdata;
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java Sat Jul  1 12:53:16 2006
@@ -54,7 +54,7 @@
     private boolean listen = false;
     private ThreadPool pool;
     private boolean direct = true;
-    private long tcpSelectorTimeout = 100;
+    private long tcpSelectorTimeout = 5000;
     //how many times to search for an available socket
     private int autoBind = 10;
     private int maxThreads = 25;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java Sat Jul  1 12:53:16 2006
@@ -54,7 +54,7 @@
      */
     public void start() throws IOException {
         try {
-            setPool(new ThreadPool(new Object(),getMaxThreads(),getMinThreads(),this));
+            setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
         } catch (Exception x) {
             log.fatal("ThreadPool can initilzed. Listener not started", x);
             if ( x instanceof IOException ) throw (IOException)x;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java Sat Jul  1 12:53:16 2006
@@ -275,7 +275,7 @@
                 byte d = (byte)i;
                 ackbuf.append(d);
                 if (ackbuf.doesPackageExist() ) {
-                    byte[] ackcmd = ackbuf.extractDataPackage(true);
+                    byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
                     ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
                     failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
                     ackReceived = ackReceived || failAckReceived;

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Sat Jul  1 12:53:16 2006
@@ -33,6 +33,7 @@
 import org.apache.catalina.tribes.transport.ThreadPool;
 import org.apache.catalina.tribes.transport.WorkerThread;
 import org.apache.catalina.tribes.util.StringManager;
+import java.util.LinkedList;
 
 /**
  * @author Filip Hanik
@@ -55,8 +56,8 @@
     private Selector selector = null;
     private ServerSocketChannel serverChannel = null;
 
-
-    private Object interestOpsMutex = new Object();
+    protected LinkedList events = new LinkedList();
+//    private Object interestOpsMutex = new Object();
 
     public NioReceiver() {
     }
@@ -70,9 +71,9 @@
         return (info);
     }
 
-    public Object getInterestOpsMutex() {
-        return interestOpsMutex;
-    }
+//    public Object getInterestOpsMutex() {
+//        return interestOpsMutex;
+//    }
 
     public void stop() {
         this.stopListening();
@@ -85,7 +86,8 @@
      */
     public void start() throws IOException {
         try {
-            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
+//            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
+            setPool(new ThreadPool(getMaxThreads(),getMinThreads(),this));
         } catch (Exception x) {
             log.fatal("ThreadPool can initilzed. Listener not started", x);
             if ( x instanceof IOException ) throw (IOException)x;
@@ -105,7 +107,7 @@
     }
     
     public WorkerThread getWorkerThread() {
-        NioReplicationThread thread = new NioReplicationThread(this);
+        NioReplicationThread thread = new NioReplicationThread(this,this);
         thread.setUseBufferPool(this.getUseBufferPool());
         thread.setRxBufSize(getRxBufSize());
         thread.setOptions(getWorkerThreadOptions());
@@ -130,6 +132,31 @@
         serverChannel.register(selector, SelectionKey.OP_ACCEPT);
         
     }
+    
+    public void addEvent(Runnable event) {
+        if ( selector != null ) {
+            synchronized (events) {
+                events.add(event);
+            }
+            selector.wakeup();
+        }
+    }
+
+    public void events() {
+        if ( events.size() == 0 ) return;
+        synchronized (events) {
+            Runnable r = null;
+            while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
+                try {
+                    r.run();
+                } catch ( Exception x ) {
+                    log.error("",x);
+                }
+            }
+            events.clear();
+        }
+    }
+
     /**
      * get data from channel and store in byte array
      * send it to cluster
@@ -148,7 +175,7 @@
             // this may block for a long time, upon return the
             // selected set contains keys of the ready channels
             try {
-
+                events();
                 int n = selector.select(getTcpSelectorTimeout());
                 if (n == 0) {
                     //there is a good chance that we got here
@@ -156,11 +183,11 @@
                     //selector wakeup().
                     //if that happens, we must ensure that that
                     //thread has enough time to call interestOps
-                    synchronized (interestOpsMutex) {
+//                    synchronized (interestOpsMutex) {
                         //if we got the lock, means there are no
                         //keys trying to register for the
                         //interestOps method
-                    }
+//                    }
                     continue; // nothing to do
                 }
                 // get an iterator over the set of selected keys

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Sat Jul  1 12:53:16 2006
@@ -49,9 +49,11 @@
     private ByteBuffer buffer = null;
     private SelectionKey key;
     private int rxBufSize;
-    public NioReplicationThread (ListenCallback callback)
+    private NioReceiver receiver;
+    public NioReplicationThread (ListenCallback callback, NioReceiver receiver)
     {
         super(callback);
+        this.receiver = receiver;
     }
 
     // loop forever waiting for work to do
@@ -131,7 +133,7 @@
      * re-enables OP_READ and calls wakeup() on the selector
      * so the selector will resume watching this channel.
      */
-    protected void drainChannel (SelectionKey key) throws Exception {
+    protected void drainChannel (final SelectionKey key) throws Exception {
         SocketChannel channel = (SocketChannel) key.channel();
         int count;
         buffer.clear();			// make buffer empty
@@ -188,21 +190,25 @@
         }
         
         //acquire the interestOps mutex
-        Object mutex = this.getPool().getInterestOpsMutex();
-        synchronized (mutex) {
-            try {
-                if ( key.isValid() ) {
-                    // cycle the selector so this key is active again
-                    key.selector().wakeup();
-                    // resume interest in OP_READ, OP_WRITE
-                    int resumeOps = key.interestOps() | SelectionKey.OP_READ;
-                    key.interestOps(resumeOps);
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    if (key.isValid()) {
+                        // cycle the selector so this key is active again
+                        key.selector().wakeup();
+                        // resume interest in OP_READ, OP_WRITE
+                        int resumeOps = key.interestOps() | SelectionKey.OP_READ;
+                        key.interestOps(resumeOps);
+                    }
+                } catch (Exception x) {
+                    try {
+                        key.selector().close();
+                    } catch (Exception ignore) {}
+                    log.error("Unable to cycle the selector, connection disconnected?", x);
                 }
-            }catch ( Exception x ) {
-                try {key.selector().close();}catch ( Exception ignore){}
-                log.error("Unable to cycle the selector, connection disconnected?",x);
             }
-        }
+        };
+        receiver.addEvent(r);
         
     }
     

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=418503&r1=418502&r2=418503&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Sat Jul  1 12:53:16 2006
@@ -156,7 +156,7 @@
         ackbuf.append(readbuf,read);
         readbuf.clear();
         if (ackbuf.doesPackageExist() ) {
-            byte[] ackcmd = ackbuf.extractDataPackage(true);
+            byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
             boolean ack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
             boolean fack = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
             if ( fack && getThrowOnFailedAck() ) throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org