You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/21 17:07:14 UTC

svn commit: r387560 - in /tomcat/container/tc5.5.x/modules/groupcom: ./ src/share/org/apache/catalina/tribes/ src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/tcp/bio/util/ src/share/org/apache/catalina/trib...

Author: fhanik
Date: Tue Mar 21 08:07:12 2006
New Revision: 387560

URL: http://svn.apache.org/viewcvs?rev=387560&view=rev
Log:
Added documentation for the ReplicatedMapEntry and added the async replication to the demos

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java Tue Mar 21 08:07:12 2006
@@ -72,6 +72,15 @@
     public int getOptions();
     public void setOptions(int options);
     
+    /**
+     * Shallow clone, only the actual message(getMessage()) is cloned, the rest remains as references
+     * @return ChannelMessage
+     */
     public ChannelMessage clone();
 
+    /**
+     * Deep clone, everything gets cloned
+     * @return ChannelMessage
+     */
+    public ChannelMessage deepclone();
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Tue Mar 21 08:07:12 2006
@@ -15,6 +15,8 @@
 
 package org.apache.catalina.tribes.group.interceptors;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelMessage;
@@ -23,7 +25,6 @@
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.tcp.bio.util.FastQueue;
 import org.apache.catalina.tribes.tcp.bio.util.LinkObject;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  *
@@ -44,22 +45,23 @@
     private boolean run = false;
     private Thread msgDispatchThread = null;
     private AtomicLong currentSize = new AtomicLong(0);
-
+    private boolean useDeepClone = false;
 
     public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
         boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
         if ( async && run ) {
             if ( (currentSize.get()+msg.getMessage().getLength()) > maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached its limit of "+maxQueueSize+" bytes, current:"+currentSize+" bytes.");
             //add to queue
-            queue.add(msg, destination, payload);
+            if ( useDeepClone ) msg = msg.deepclone();
+            if (!queue.add(msg, destination, payload) ) {
+                throw new ChannelException("Unable to add the message to the async queue, queue bug?");
+            }
             currentSize.addAndGet(msg.getMessage().getLength());
         } else {
             super.sendMessage(destination, msg, payload);
         }
     }
     
-    
-
     public void messageReceived(ChannelMessage msg) {
         super.messageReceived(msg);
     }
@@ -78,10 +80,18 @@
         this.maxQueueSize = maxQueueSize;
     }
 
+    public void setUseDeepClone(boolean useDeepClone) {
+        this.useDeepClone = useDeepClone;
+    }
+
     public long getMaxQueueSize() {
         return maxQueueSize;
     }
-    
+
+    public boolean getUseDeepClone() {
+        return useDeepClone;
+    }
+
     public void start(int svc) throws ChannelException {
         //start the thread
         if (!run ) {

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java Tue Mar 21 08:07:12 2006
@@ -174,6 +174,7 @@
         enabled = enable;
         if (!enabled) {
             lock.abortRemove();
+            last = first = null;
         }
     }
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Mar 21 08:07:12 2006
@@ -55,6 +55,16 @@
 public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener {
     protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
 
+    /**
+     * The default initial capacity - MUST be a power of two.
+     */
+    public static final int DEFAULT_INITIAL_CAPACITY = 16;
+
+    /**
+     * The load factor used when none specified in constructor.
+     **/
+    public static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
 //------------------------------------------------------------------------------
 //              INSTANCE VARIABLES
 //------------------------------------------------------------------------------
@@ -65,8 +75,8 @@
     private transient boolean stateTransferred = false;
     private transient Object stateMutex = new Object();
     private transient ArrayList mapMembers = new ArrayList();
-    
     private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+    private transient Object mapOwner;
 
 //------------------------------------------------------------------------------
 //              CONSTRUCTORS
@@ -80,52 +90,24 @@
      * @param initialCapacity int - the size of this map, see HashMap
      * @param loadFactor float - load factor, see HashMap
      */
-    public AbstractReplicatedMap(Channel channel, 
+    public AbstractReplicatedMap(Object owner,
+                                 Channel channel, 
                                  long timeout, 
                                  String mapContextName, 
                                  int initialCapacity,
                                  float loadFactor,
                                  int channelSendOptions) {
         super(initialCapacity, loadFactor);
-        init(channel, mapContextName, timeout, channelSendOptions);
+        init(owner, channel, mapContextName, timeout, channelSendOptions);
         
     }
 
-    /**
-     * Creates a new map
-     * @param channel The channel to use for communication
-     * @param timeout long - timeout for RPC messags
-     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
-     * @param initialCapacity int - the size of this map, see HashMap
-     */
-    public AbstractReplicatedMap(Channel channel, 
-                                 long timeout, 
-                                 String mapContextName, 
-                                 int initialCapacity,
-                                 int channelSendOptions) {
-        super(initialCapacity);
-        init(channel, mapContextName, timeout, channelSendOptions);
-    }
-
-    /**
-     * Creates a new map
-     * @param channel The channel to use for communication
-     * @param timeout long - timeout for RPC messags
-     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
-     */
-    public AbstractReplicatedMap(Channel channel, 
-                                 long timeout, 
-                                 String mapContextName,
-                                 int channelSendOptions) {
-        super();
-        init(channel, mapContextName, timeout, channelSendOptions);
-    }
-    
     protected Member[] wrap(Member m) {
         return new Member[] {m};
     }
 
-    private void init(Channel channel, String mapContextName, long timeout, int channelSendOptions) {
+    private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) {
+        this.mapOwner = owner;
         final String chset = "ISO-8859-1";
         this.channelSendOptions = channelSendOptions;
         this.channel = channel;
@@ -277,9 +259,12 @@
 
                         //make sure we don't store that actual object as primary or backup
                         MapEntry local = (MapEntry)super.get(m.getKey());
-                        if (local != null && (!local.isProxy()))continue;
+                        if (local != null && (!local.isProxy())) continue;
 
                         //store the object
+                        if (m.getValue()!=null && m.getValue() instanceof ReplicatedMapEntry ) {
+                            ((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner());
+                        }
                         MapEntry entry = new MapEntry(m.getKey(), m.getValue());
                         entry.setBackup(false);
                         entry.setProxy(true);
@@ -390,6 +375,9 @@
                 entry.setBackup(true);
                 entry.setProxy(false);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
+                if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
+                    ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
+                }
                 super.put(entry.getKey(), entry);
             } else {
                 entry.setBackup(true);
@@ -400,7 +388,7 @@
                     if (mapmsg.isDiff()) {
                         try {
                             diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
-                        } catch (IOException x) {
+                        } catch (Exception x) {
                             log.error("Unable to apply diff to key:" + entry.getKey(), x);
                         }
                     } else {
@@ -865,6 +853,14 @@
 
     public boolean isStateTransferred() {
         return stateTransferred;
+    }
+
+    public Object getMapOwner() {
+        return mapOwner;
+    }
+
+    public void setMapOwner(Object mapOwner) {
+        this.mapOwner = mapOwner;
     }
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Tue Mar 21 08:07:12 2006
@@ -84,8 +84,8 @@
          * @param initialCapacity int - the size of this map, see HashMap
          * @param loadFactor float - load factor, see HashMap
          */
-        public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
-            super(channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT);
+        public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
+            super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT);
         }
 
         /**
@@ -95,8 +95,8 @@
          * @param mapContextName String - unique name for this map, to allow multiple maps per channel
          * @param initialCapacity int - the size of this map, see HashMap
          */
-        public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
-            super(channel,timeout,mapContextName,initialCapacity, Channel.SEND_OPTIONS_DEFAULT);
+        public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+            super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
         }
 
         /**
@@ -105,8 +105,8 @@
          * @param timeout long - timeout for RPC messags
          * @param mapContextName String - unique name for this map, to allow multiple maps per channel
          */
-        public LazyReplicatedMap(Channel channel, long timeout, String mapContextName) {
-            super(channel,timeout,mapContextName, Channel.SEND_OPTIONS_DEFAULT);
+        public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+            super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
         }
 
 

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Tue Mar 21 08:07:12 2006
@@ -66,9 +66,8 @@
      * @param initialCapacity int - the size of this map, see HashMap
      * @param loadFactor float - load factor, see HashMap
      */
-    public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity,
-                             float loadFactor) {
-        super(channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT);
+    public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) {
+        super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT);
     }
 
     /**
@@ -78,8 +77,8 @@
      * @param mapContextName String - unique name for this map, to allow multiple maps per channel
      * @param initialCapacity int - the size of this map, see HashMap
      */
-    public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
-        super(channel, timeout, mapContextName, initialCapacity, Channel.SEND_OPTIONS_DEFAULT);
+    public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+        super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
     }
 
     /**
@@ -88,8 +87,8 @@
      * @param timeout long - timeout for RPC messags
      * @param mapContextName String - unique name for this map, to allow multiple maps per channel
      */
-    public ReplicatedMap(Channel channel, long timeout, String mapContextName) {
-        super(channel, timeout, mapContextName, Channel.SEND_OPTIONS_DEFAULT);
+    public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+        super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
     }
 
 //------------------------------------------------------------------------------

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java Tue Mar 21 08:07:12 2006
@@ -15,13 +15,34 @@
  */
 package org.apache.catalina.tribes.tipis;
 
-import java.io.Serializable;
 import java.io.IOException;
-import org.apache.catalina.tribes.tcp.*;
+import java.io.Serializable;
 
 /**
  * 
- * For smarter replication, an object can implement this interface to replicate diffs
+ * For smarter replication, an object can implement this interface to replicate diffs<br>
+ * The replication logic will call the methods in the following order:<br>
+ * <code>
+ * 1. if ( entry.isDirty() ) <br>
+ *      try {
+ * 2.     entry.lock();<br>
+ * 3.     byte[] diff = entry.getDiff();<br>
+ * 4.     entry.reset();<br>
+ *      } finally {<br>
+ * 5.     entry.unlock();<br>
+ *      }<br>
+ *    }<br>
+ * </code>
+ * <br>
+ * <br>
+ * When the data is deserialized the logic is called in the following order<br>
+ * <code>
+ * 1. ReplicatedMapEntry entry = (ReplicatedMapEntry)objectIn.readObject();<br>
+ * 2. if ( isBackup(entry)||isPrimary(entry) ) entry.setOwner(owner); <br>
+ * </code>
+ * <br>
+ * 
+ * 
  * @author Filip Hanik
  * @version 1.0
  */
@@ -34,8 +55,6 @@
      */
     public boolean isDirty();
     
-    public boolean setDirty(boolean dirty);
-    
     /**
      * If this returns true, the map will extract the diff using getDiff()
      * Otherwise it will serialize the entire object.
@@ -58,7 +77,33 @@
      * @param length int
      * @throws IOException
      */
-    public void applyDiff(byte[] diff, int offset, int length) throws IOException;
+    public void applyDiff(byte[] diff, int offset, int length) throws IOException, ClassNotFoundException;
+    
+    /**
+     * Resets the current diff state and resets the dirty flag
+     */
+    public void resetDiff();
+    
+    /**
+     * Lock during serialization
+     */
+    public void lock();
+    
+    /**
+     * Unlock after serialization
+     */
+    public void unlock();
+    
+    /**
+     * This method is called after the object has been 
+     * created on a remote map. On this method,
+     * the object can initialize itself for any data that wasn't 
+     * 
+     * @param owner Object
+     */
+    public void setOwner(Object owner);
+    
+    
     
     
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Tue Mar 21 08:07:12 2006
@@ -30,6 +30,7 @@
 import org.apache.catalina.tribes.tcp.ReplicationTransmitter;
 import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
 
 /**
  * <p>Title: </p>
@@ -64,7 +65,9 @@
            .append("\n\t\t[-order]")
            .append("\n\t\t[-ordersize maxorderqueuesize]")
            .append("\n\t\t[-frag]")
-           .append("\n\t\t[-fragsize maxmsgsize]");
+           .append("\n\t\t[-fragsize maxmsgsize]")
+           .append("\n\t\t[-async]")
+           .append("\n\t\t[-asyncsize maxqueuesizeinbytes]");
        return buf;
 
     }
@@ -89,6 +92,8 @@
         Properties transportProperties = new Properties();
         String transport = "org.apache.catalina.tribes.tcp.nio.PooledParallelSender";
         String receiver = "org.apache.catalina.tribes.tcp.nio.NioReceiver";
+        boolean async = false;
+        int asyncsize = 1024*1024*50; //50MB
         
         for (int i = 0; i < args.length; i++) {
             if ("-bind".equals(args[i])) {
@@ -103,6 +108,11 @@
                 tcpthreadcount = Integer.parseInt(args[++i]);
             } else if ("-gzip".equals(args[i])) {
                 gzip = true;
+            } else if ("-async".equals(args[i])) {
+                async = true;
+            } else if ("-asyncsize".equals(args[i])) {
+                asyncsize = Integer.parseInt(args[++i]);
+                System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize);
             } else if ("-order".equals(args[i])) {
                 order = true;
             } else if ("-ordersize".equals(args[i])) {
@@ -187,6 +197,13 @@
             oi.setMaxQueue(ordersize);
             channel.addInterceptor(oi);
         }
+        
+        if ( async ) {
+            MessageDispatchInterceptor mi = new MessageDispatchInterceptor();
+            mi.setMaxQueueSize(asyncsize);
+            channel.addInterceptor(mi);
+        }
+        
         return channel;
         
     }

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Tue Mar 21 08:07:12 2006
@@ -55,6 +55,7 @@
     public int statsInterval = 10000;
     public long pause = 0;
     public boolean breakonChannelException = false;
+    public boolean async = false;
     public long receiveStart = 0;
     public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
     
@@ -144,7 +145,7 @@
                             Thread.sleep(pause);
                         }
                     } catch (ChannelException x) {
-                        log.error("Unable to send message.");
+                        log.error("Unable to send message:"+x.getMessage());
                         Member[] faulty = x.getFaultyMembers();
                         for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
                         --counter;

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Tue Mar 21 08:07:12 2006
@@ -45,7 +45,7 @@
     protected SimpleTableDemo table;
     
     public MapDemo(Channel channel ) {
-        map = new LazyReplicatedMap(channel,5000, "MapDemo");
+        map = new LazyReplicatedMap(this,channel,5000, "MapDemo");
         table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
         channel.addChannelListener(this);
         channel.addMembershipListener(this);

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Tue Mar 21 08:07:12 2006
@@ -29,6 +29,12 @@
 
 Code Tasks:
 ===========================================
+32. Replicated JNDI entries in Tomcat in the format
+    cluster:<map name>/<entry key> for example
+    cluster:myapps/db/shared/dbinfo
+
+31. A layer on top of the GroupChannel, to allow multiple processes share
+    a channel to send/receive message to conserve system resources - this is way in the future.
 
 30. CookieBasedReplicationMap - a very simple extension to the LazyReplicatedMap
     but instead of randomly selecting a backup node and then publishing the PROXY to all
@@ -50,17 +56,6 @@
 25. Member.uniqueId - 16 bytes unique for a member, UUID
     Needed to not confuse a crashed member with a revived member on the same port
 
-24. MessageDispatchInterceptor - for asynchronous sending
-    - looks at the options flag SEND_OPTIONS_ASYNCHRONOUS
-    - has two modes 
-      a) async parallel send - each message to all destinations before next message
-      b) async per/member - one thread per member using the FastAsyncQueue (good for groups with slow receivers)
-    - (optional)persistent - writes messages to persistent store first, then starts processing
-    - Callback error handler - for when messages fail, and the application wishes to become notified
-    - MUST HAVE A LIMIT QUEUE SIZE IN MB, to avoid OOM errors or persist the queue.
-    - MUST USE ClusterData.deepclone() to ensure thread safety if ClusterData objects get recycled
-
-
 23. TotalOrderInterceptor - fairly straight forward implementation
     This interceptor would depend on the fact that there is some sort of 
     membership coordinator, see task 9.
@@ -131,7 +126,8 @@
 
 16. Guaranteed delivery of messages, ie either all get it or none get it.
     Meaning, that all receivers get it, then wait for a process command.
-    ala Gossip protocol
+    ala Gossip protocol - this is fairly redundant with a Xa2PhaseCommitInterceptor
+    except it doesn't keep a transaction log.
 
 17. Implement transactions - the ability to start a transaction, send several messages,
                              and then commit the transaction
@@ -192,3 +188,14 @@
 Notes: see Channel.SEND_OPT_XXXX variables
 
 28. Thread pool should have maxThreads and minThreads and grow dynamically
+
+24. MessageDispatchInterceptor - for asynchronous sending
+    - looks at the options flag SEND_OPTIONS_ASYNCHRONOUS
+    - has two modes 
+      a) async parallel send - each message to all destinations before next message
+      b) async per/member - one thread per member using the FastAsyncQueue (good for groups with slow receivers)
+    - Callback error handler - for when messages fail, and the application wishes to become notified
+    - MUST HAVE A LIMIT QUEUE SIZE IN MB, to avoid OOM errors or persist the queue.
+    - MUST USE ClusterData.deepclone() to ensure thread safety if ClusterData objects get recycled
+Notes: Simple implementation, one thread, invokes all senders in parallel.
+       Deep cloning is configurable as optimization.



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org