You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/06 01:10:36 UTC

svn commit: r419379 - in /tomcat/container/tc5.5.x/modules/groupcom: ./ src/share/org/apache/catalina/tribes/ src/share/org/apache/catalina/tribes/io/ src/share/org/apache/catalina/tribes/transport/nio/ src/share/org/apache/catalina/tribes/util/

Author: fhanik
Date: Wed Jul  5 16:10:36 2006
New Revision: 419379

URL: http://svn.apache.org/viewvc?rev=419379&view=rev
Log:
Truly non blocking, don't send up the stack until we have processed the message

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/VERSION
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/VERSION?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/VERSION (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/VERSION Wed Jul  5 16:10:36 2006
@@ -1,3 +1,6 @@
+0.9.4.7
+  - release the socket key to the poller before sending the data requests up through the channel
+    this makes receiving data non blocking, even if the application is
 0.9.4.6
   - fix package processing, the old release was hogging a thread for a single connection, making concurrency not so efficient,
     this fix uses the thread for one package, then moves on. faster concurrency, and much less memory usage in high stress environments

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java Wed Jul  5 16:10:36 2006
@@ -15,7 +15,7 @@
  */
 package org.apache.catalina.tribes;
 
-import java.util.Arrays;
+import org.apache.catalina.tribes.util.Arrays;
 
 /**
  * <p>Title: Represents a globabally unique Id</p>

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Wed Jul  5 16:10:36 2006
@@ -35,6 +35,8 @@
  * 
  */
 public class ChannelData implements ChannelMessage {
+    public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0];
+    
     public static boolean USE_SECURE_RANDOM_FOR_UUID = false;
     
     /**

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Wed Jul  5 16:10:36 2006
@@ -28,6 +28,7 @@
 import org.apache.catalina.tribes.io.ChannelData;
 import org.apache.catalina.tribes.io.BufferPool;
 import java.nio.channels.CancelledKeyException;
+import org.apache.catalina.tribes.UniqueId;
 
 /**
  * A worker thread class which can drain channels and echo-back the input. Each
@@ -148,43 +149,61 @@
         }
 
         int pkgcnt = reader.count();
-
-        if ( pkgcnt > 0 ) {
-            ChannelMessage[] msgs = reader.execute();
-            for ( int i=0; i<msgs.length; i++ ) {
+        
+        if (count < 0 && pkgcnt == 0 ) {
+            //end of stream, and no more packages to process
+            remoteEof(key);
+            return;
+        }
+        
+        ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
+        
+        registerForRead(key);//register to read new data, before we send it off to avoid dead locks
+        
+        for ( int i=0; i<msgs.length; i++ ) {
+            /**
+             * Use send ack here if you want to ack the request to the remote 
+             * server before completing the request
+             * This is considered an asynchronized request
+             */
+            if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+            try {
+                if ( log.isTraceEnabled() ) {
+                    try {
+                        log.trace("Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
+                    }catch ( Throwable t ) {}
+                }
+                //process the message
+                getCallback().messageDataReceived(msgs[i]);
                 /**
-                 * Use send ack here if you want to ack the request to the remote 
-                 * server before completing the request
-                 * This is considered an asynchronized request
+                 * Use send ack here if you want the request to complete on this 
+                 * server before sending the ack to the remote server
+                 * This is considered a synchronized request
                  */
-                if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
-                try {
-                    //process the message
-                    getCallback().messageDataReceived(msgs[i]);
-                    /**
-                     * Use send ack here if you want the request to complete on this 
-                     * server before sending the ack to the remote server
-                     * This is considered a synchronized request
-                     */
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
-                }catch ( Exception e ) {
-                    log.error("Processing of cluster message failed.",e);
-                    if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
-                }
-                if ( getUseBufferPool() ) {
-                    BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
-                    msgs[i].setMessage(null);
-                }
-            }                        
-        }
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+            }catch ( Exception e ) {
+                log.error("Processing of cluster message failed.",e);
+                if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+            }
+            if ( getUseBufferPool() ) {
+                BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+                msgs[i].setMessage(null);
+            }
+        }                        
         
         if (count < 0) {
-            // close channel on EOF, invalidates the key
-            if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
-            cancelKey(key);
+            remoteEof(key);
             return;
         }
-        
+    }
+
+    private void remoteEof(SelectionKey key) {
+        // close channel on EOF, invalidates the key
+        if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
+        cancelKey(key);
+    }
+
+    protected void registerForRead(final SelectionKey key) {
         //register our OP_READ interest
         Runnable r = new Runnable() {
             public void run() {
@@ -207,7 +226,6 @@
             }
         };
         receiver.addEvent(r);
-        
     }
 
     private void cancelKey(final SelectionKey key) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Wed Jul  5 16:10:36 2006
@@ -107,6 +107,10 @@
         return new UniqueId(data);
     }
     
+    public static boolean equals(byte[] o1, byte[] o2) {
+        return java.util.Arrays.equals(o1,o2);
+    }
+
     public static boolean equals(Object[] o1, Object[] o2) {
         boolean result = o1.length == o2.length;
         if ( result ) for (int i=0; i<o1.length && result; i++ ) result = o1[i].equals(o2[i]);
@@ -171,6 +175,19 @@
 //System.out.println("Members:"+toNameString(members));
         return idx;
     }
+    
+    public static int hashCode(byte a[]) {
+        if (a == null)
+            return 0;
+
+        int result = 1;
+        for (int i=0; i<a.length; i++) {
+            byte element = a[i];
+            result = 31 * result + element;
+        }
+        return result;
+    }
+
 
     
     



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org