You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/06 01:10:36 UTC
svn commit: r419379 - in /tomcat/container/tc5.5.x/modules/groupcom: ./
src/share/org/apache/catalina/tribes/
src/share/org/apache/catalina/tribes/io/
src/share/org/apache/catalina/tribes/transport/nio/
src/share/org/apache/catalina/tribes/util/
Author: fhanik
Date: Wed Jul 5 16:10:36 2006
New Revision: 419379
URL: http://svn.apache.org/viewvc?rev=419379&view=rev
Log:
Truly non blocking, don't send up the stack until we have processed the message
Modified:
tomcat/container/tc5.5.x/modules/groupcom/VERSION
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/VERSION
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/VERSION?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/VERSION (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/VERSION Wed Jul 5 16:10:36 2006
@@ -1,3 +1,6 @@
+0.9.4.7
+ - release the socket key to the poller before sending the data requests up through the channel
+ this makes receiving data non blocking, even if the application is
0.9.4.6
- fix package processing, the old release was hogging a thread for a single connection, making concurrency not so efficient,
this fix uses the thread for one package, then moves on. faster concurrency, and much less memory usage in high stress environments
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java Wed Jul 5 16:10:36 2006
@@ -15,7 +15,7 @@
*/
package org.apache.catalina.tribes;
-import java.util.Arrays;
+import org.apache.catalina.tribes.util.Arrays;
/**
* <p>Title: Represents a globabally unique Id</p>
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ChannelData.java Wed Jul 5 16:10:36 2006
@@ -35,6 +35,8 @@
*
*/
public class ChannelData implements ChannelMessage {
+ public static ChannelData[] EMPTY_DATA_ARRAY = new ChannelData[0];
+
public static boolean USE_SECURE_RANDOM_FOR_UUID = false;
/**
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReplicationThread.java Wed Jul 5 16:10:36 2006
@@ -28,6 +28,7 @@
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.BufferPool;
import java.nio.channels.CancelledKeyException;
+import org.apache.catalina.tribes.UniqueId;
/**
* A worker thread class which can drain channels and echo-back the input. Each
@@ -148,43 +149,61 @@
}
int pkgcnt = reader.count();
-
- if ( pkgcnt > 0 ) {
- ChannelMessage[] msgs = reader.execute();
- for ( int i=0; i<msgs.length; i++ ) {
+
+ if (count < 0 && pkgcnt == 0 ) {
+ //end of stream, and no more packages to process
+ remoteEof(key);
+ return;
+ }
+
+ ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
+
+ registerForRead(key);//register to read new data, before we send it off to avoid dead locks
+
+ for ( int i=0; i<msgs.length; i++ ) {
+ /**
+ * Use send ack here if you want to ack the request to the remote
+ * server before completing the request
+ * This is considered an asynchronized request
+ */
+ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+ try {
+ if ( log.isTraceEnabled() ) {
+ try {
+ log.trace("Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()));
+ }catch ( Throwable t ) {}
+ }
+ //process the message
+ getCallback().messageDataReceived(msgs[i]);
/**
- * Use send ack here if you want to ack the request to the remote
- * server before completing the request
- * This is considered an asynchronized request
+ * Use send ack here if you want the request to complete on this
+ * server before sending the ack to the remote server
+ * This is considered a synchronized request
*/
- if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
- try {
- //process the message
- getCallback().messageDataReceived(msgs[i]);
- /**
- * Use send ack here if you want the request to complete on this
- * server before sending the ack to the remote server
- * This is considered a synchronized request
- */
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
- }catch ( Exception e ) {
- log.error("Processing of cluster message failed.",e);
- if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
- }
- if ( getUseBufferPool() ) {
- BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
- msgs[i].setMessage(null);
- }
- }
- }
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND);
+ }catch ( Exception e ) {
+ log.error("Processing of cluster message failed.",e);
+ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND);
+ }
+ if ( getUseBufferPool() ) {
+ BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
+ msgs[i].setMessage(null);
+ }
+ }
if (count < 0) {
- // close channel on EOF, invalidates the key
- if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
- cancelKey(key);
+ remoteEof(key);
return;
}
-
+ }
+
+ private void remoteEof(SelectionKey key) {
+ // close channel on EOF, invalidates the key
+ if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting");
+ cancelKey(key);
+ }
+
+ protected void registerForRead(final SelectionKey key) {
//register our OP_READ interest
Runnable r = new Runnable() {
public void run() {
@@ -207,7 +226,6 @@
}
};
receiver.addEvent(r);
-
}
private void cancelKey(final SelectionKey key) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=419379&r1=419378&r2=419379&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Wed Jul 5 16:10:36 2006
@@ -107,6 +107,10 @@
return new UniqueId(data);
}
+ public static boolean equals(byte[] o1, byte[] o2) {
+ return java.util.Arrays.equals(o1,o2);
+ }
+
public static boolean equals(Object[] o1, Object[] o2) {
boolean result = o1.length == o2.length;
if ( result ) for (int i=0; i<o1.length && result; i++ ) result = o1[i].equals(o2[i]);
@@ -171,6 +175,19 @@
//System.out.println("Members:"+toNameString(members));
return idx;
}
+
+ public static int hashCode(byte a[]) {
+ if (a == null)
+ return 0;
+
+ int result = 1;
+ for (int i=0; i<a.length; i++) {
+ byte element = a[i];
+ result = 31 * result + element;
+ }
+ return result;
+ }
+
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org