You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/01 00:31:26 UTC
svn commit: r381827 - in /tomcat/container/tc5.5.x/modules/groupcom:
src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
to-do.txt
Author: fhanik
Date: Tue Feb 28 15:31:26 2006
New Revision: 381827
URL: http://svn.apache.org/viewcvs?rev=381827&view=rev
Log:
Cleaned up synchronization for the frag interceptor, only need to synchronize on write to the map, not read.
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java?rev=381827&r1=381826&r2=381827&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java Tue Feb 28 15:31:26 2006
@@ -24,6 +24,9 @@
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Set;
/**
*
@@ -39,6 +42,8 @@
* @version 1.0
*/
public class FragmentationInterceptor extends ChannelInterceptorBase {
+ private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( FragmentationInterceptor.class );
+
protected HashMap fragpieces = new HashMap();
private int maxSize = 1024*100;
private long expire = 1000 * 60; //one minute expiration
@@ -68,16 +73,21 @@
}
- public synchronized FragCollection getFragCollection(FragKey key, ChannelMessage msg) {
+ public FragCollection getFragCollection(FragKey key, ChannelMessage msg) {
FragCollection coll = (FragCollection)fragpieces.get(key);
if ( coll == null ) {
- coll = new FragCollection(msg);
- fragpieces.put(key,coll);
+ synchronized (fragpieces) {
+ coll = (FragCollection)fragpieces.get(key);
+ if ( coll == null ) {
+ coll = new FragCollection(msg);
+ fragpieces.put(key, coll);
+ }
+ }
}
return coll;
}
- public synchronized void removeFragCollection(FragKey key) {
+ public void removeFragCollection(FragKey key) {
fragpieces.remove(key);
}
@@ -120,6 +130,23 @@
for ( int i=0; i<messages.length; i++ ) {
super.sendMessage(destination,messages[i],payload);
}
+ }
+
+ public void heartbeat() {
+ try {
+ Set set = fragpieces.keySet();
+ Object[] keys = set.toArray();
+ for ( int i=0; i<keys.length; i++ ) {
+ FragKey key = (FragKey)keys[i];
+ if ( key != null && key.expired(getExpire()) )
+ removeFragCollection(key);
+ }
+ }catch ( Exception x ) {
+ if ( log.isErrorEnabled() ) {
+ log.error("Unable to perform heartbeat clean up in the frag interceptor",x);
+ }
+ }
+ super.heartbeat();
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=381827&r1=381826&r2=381827&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Tue Feb 28 15:31:26 2006
@@ -1,24 +1,25 @@
To Do:
+===========================================
+
+Documentation:
+===========================================
-Tasks:
-4. ChannelMessage.getMessage should return streamable, that way we can wrap,
-pass it around and all those good things without having to copy byte arrays
-left and right
-5. NIO and IO DataSender, since the IO is blocking
-Interceptors to create
-1. OrderInterceptor - guarantees the order of messages
-2. WaitForCompletionInterceptor - waits for the message to get processed by all receivers before returning
+Code Tasks:
+===========================================
+
+6. NIO and IO DataSender, since the IO is blocking
+
+8. WaitForCompletionInterceptor - waits for the message to get processed by all receivers before returning
(This is useful when synchronized=false and waitForAck=false, to improve
parallel processing, but you want to have all messages sent in parallel and
don't return until all have been processed on the remote end.)
-3. FragmentationInterceptor - splits up messages that are larger than X bytes.
-4. CoordinatorInterceptor - manages the selection of a cluster coordinator
-5. VirtualSynchronyInterceptor - not sure we want to build this one, it would be
-pretty slow, but it would guarantee that all messages were received, to the
-members in that group in that time.
+
+9. CoordinatorInterceptor - manages the selection of a cluster coordinator
+10. Xa2PhaseCommitInterceptor - make sure the message doesn't reach the receiver unless all members got it
Tasks Completed
+===========================================
1. True synchronized/asynchronized replication enabled using flags
Sender.sendAck/Receiver.waitForAck/Receiver.synchronized
Task Desc: waitForAck - should only mean, we received the message, not for the
@@ -27,9 +28,22 @@
Status: Complete
Notes:
-
2. Unique id, send it in byte array instead of string
3. DataSender or ReplicationTransmitter swallows IOException, this should be
Notes: This has only been fixed for the pooled synchronized. the fastasynch
aint working that well
+
+4. ChannelMessage.getMessage should return streamable, that way we can wrap,
+pass it around and all those good things without having to copy byte arrays
+left and right
+Notes: Instead of using a streamable, this is implemented using the XByteBuffer,
+ which is very easy to use. It also becomes a single spot for optimizations.
+ Ideally, there would be a pool of XByteBuffers, that all use direct ByteBuffers
+ for its data handling.
+
+5. OrderInterceptor - guarantees the order of messages
+Notes: completed
+
+7. FragmentationInterceptor - splits up messages that are larger than X bytes.
+Notes: complated
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org