You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/21 17:07:14 UTC
svn commit: r387560 - in /tomcat/container/tc5.5.x/modules/groupcom: ./
src/share/org/apache/catalina/tribes/
src/share/org/apache/catalina/tribes/group/interceptors/
src/share/org/apache/catalina/tribes/tcp/bio/util/
src/share/org/apache/catalina/trib...
Author: fhanik
Date: Tue Mar 21 08:07:12 2006
New Revision: 387560
URL: http://svn.apache.org/viewcvs?rev=387560&view=rev
Log:
Added documentation for the ReplicatedMapEntry and added the async replication to the demos
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/ChannelMessage.java Tue Mar 21 08:07:12 2006
@@ -72,6 +72,15 @@
public int getOptions();
public void setOptions(int options);
+ /**
+ * Shallow clone, only the actual message(getMessage()) is cloned, the rest remains as references
+ * @return ChannelMessage
+ */
public ChannelMessage clone();
+ /**
+ * Deep clone, everything gets cloned
+ * @return ChannelMessage
+ */
+ public ChannelMessage deepclone();
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Tue Mar 21 08:07:12 2006
@@ -15,6 +15,8 @@
package org.apache.catalina.tribes.group.interceptors;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
@@ -23,7 +25,6 @@
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.tcp.bio.util.FastQueue;
import org.apache.catalina.tribes.tcp.bio.util.LinkObject;
-import java.util.concurrent.atomic.AtomicLong;
/**
*
@@ -44,22 +45,23 @@
private boolean run = false;
private Thread msgDispatchThread = null;
private AtomicLong currentSize = new AtomicLong(0);
-
+ private boolean useDeepClone = false;
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
if ( async && run ) {
if ( (currentSize.get()+msg.getMessage().getLength()) > maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached its limit of "+maxQueueSize+" bytes, current:"+currentSize+" bytes.");
//add to queue
- queue.add(msg, destination, payload);
+ if ( useDeepClone ) msg = msg.deepclone();
+ if (!queue.add(msg, destination, payload) ) {
+ throw new ChannelException("Unable to add the message to the async queue, queue bug?");
+ }
currentSize.addAndGet(msg.getMessage().getLength());
} else {
super.sendMessage(destination, msg, payload);
}
}
-
-
public void messageReceived(ChannelMessage msg) {
super.messageReceived(msg);
}
@@ -78,10 +80,18 @@
this.maxQueueSize = maxQueueSize;
}
+ public void setUseDeepClone(boolean useDeepClone) {
+ this.useDeepClone = useDeepClone;
+ }
+
public long getMaxQueueSize() {
return maxQueueSize;
}
-
+
+ public boolean getUseDeepClone() {
+ return useDeepClone;
+ }
+
public void start(int svc) throws ChannelException {
//start the thread
if (!run ) {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/util/FastQueue.java Tue Mar 21 08:07:12 2006
@@ -174,6 +174,7 @@
enabled = enable;
if (!enabled) {
lock.abortRemove();
+ last = first = null;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Mar 21 08:07:12 2006
@@ -55,6 +55,16 @@
public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener {
protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
+ /**
+ * The default initial capacity - MUST be a power of two.
+ */
+ public static final int DEFAULT_INITIAL_CAPACITY = 16;
+
+ /**
+ * The load factor used when none specified in constructor.
+ **/
+ public static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
@@ -65,8 +75,8 @@
private transient boolean stateTransferred = false;
private transient Object stateMutex = new Object();
private transient ArrayList mapMembers = new ArrayList();
-
private transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+ private transient Object mapOwner;
//------------------------------------------------------------------------------
// CONSTRUCTORS
@@ -80,52 +90,24 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public AbstractReplicatedMap(Channel channel,
+ public AbstractReplicatedMap(Object owner,
+ Channel channel,
long timeout,
String mapContextName,
int initialCapacity,
float loadFactor,
int channelSendOptions) {
super(initialCapacity, loadFactor);
- init(channel, mapContextName, timeout, channelSendOptions);
+ init(owner, channel, mapContextName, timeout, channelSendOptions);
}
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- */
- public AbstractReplicatedMap(Channel channel,
- long timeout,
- String mapContextName,
- int initialCapacity,
- int channelSendOptions) {
- super(initialCapacity);
- init(channel, mapContextName, timeout, channelSendOptions);
- }
-
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- */
- public AbstractReplicatedMap(Channel channel,
- long timeout,
- String mapContextName,
- int channelSendOptions) {
- super();
- init(channel, mapContextName, timeout, channelSendOptions);
- }
-
protected Member[] wrap(Member m) {
return new Member[] {m};
}
- private void init(Channel channel, String mapContextName, long timeout, int channelSendOptions) {
+ private void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions) {
+ this.mapOwner = owner;
final String chset = "ISO-8859-1";
this.channelSendOptions = channelSendOptions;
this.channel = channel;
@@ -277,9 +259,12 @@
//make sure we don't store that actual object as primary or backup
MapEntry local = (MapEntry)super.get(m.getKey());
- if (local != null && (!local.isProxy()))continue;
+ if (local != null && (!local.isProxy())) continue;
//store the object
+ if (m.getValue()!=null && m.getValue() instanceof ReplicatedMapEntry ) {
+ ((ReplicatedMapEntry)m.getValue()).setOwner(getMapOwner());
+ }
MapEntry entry = new MapEntry(m.getKey(), m.getValue());
entry.setBackup(false);
entry.setProxy(true);
@@ -390,6 +375,9 @@
entry.setBackup(true);
entry.setProxy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
+ if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
+ ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
+ }
super.put(entry.getKey(), entry);
} else {
entry.setBackup(true);
@@ -400,7 +388,7 @@
if (mapmsg.isDiff()) {
try {
diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
- } catch (IOException x) {
+ } catch (Exception x) {
log.error("Unable to apply diff to key:" + entry.getKey(), x);
}
} else {
@@ -865,6 +853,14 @@
public boolean isStateTransferred() {
return stateTransferred;
+ }
+
+ public Object getMapOwner() {
+ return mapOwner;
+ }
+
+ public void setMapOwner(Object mapOwner) {
+ this.mapOwner = mapOwner;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Tue Mar 21 08:07:12 2006
@@ -84,8 +84,8 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
- super(channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT);
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
+ super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT);
}
/**
@@ -95,8 +95,8 @@
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
- super(channel,timeout,mapContextName,initialCapacity, Channel.SEND_OPTIONS_DEFAULT);
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+ super(owner, channel,timeout,mapContextName,initialCapacity, LazyReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
}
/**
@@ -105,8 +105,8 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
*/
- public LazyReplicatedMap(Channel channel, long timeout, String mapContextName) {
- super(channel,timeout,mapContextName, Channel.SEND_OPTIONS_DEFAULT);
+ public LazyReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+ super(owner, channel,timeout,mapContextName, LazyReplicatedMap.DEFAULT_INITIAL_CAPACITY,LazyReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Tue Mar 21 08:07:12 2006
@@ -66,9 +66,8 @@
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
- public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity,
- float loadFactor) {
- super(channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT);
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor) {
+ super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT);
}
/**
@@ -78,8 +77,8 @@
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
- public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
- super(channel, timeout, mapContextName, initialCapacity, Channel.SEND_OPTIONS_DEFAULT);
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName, int initialCapacity) {
+ super(owner,channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,Channel.SEND_OPTIONS_DEFAULT);
}
/**
@@ -88,8 +87,8 @@
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
*/
- public ReplicatedMap(Channel channel, long timeout, String mapContextName) {
- super(channel, timeout, mapContextName, Channel.SEND_OPTIONS_DEFAULT);
+ public ReplicatedMap(Object owner, Channel channel, long timeout, String mapContextName) {
+ super(owner, channel, timeout, mapContextName,AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT);
}
//------------------------------------------------------------------------------
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java Tue Mar 21 08:07:12 2006
@@ -15,13 +15,34 @@
*/
package org.apache.catalina.tribes.tipis;
-import java.io.Serializable;
import java.io.IOException;
-import org.apache.catalina.tribes.tcp.*;
+import java.io.Serializable;
/**
*
- * For smarter replication, an object can implement this interface to replicate diffs
+ * For smarter replication, an object can implement this interface to replicate diffs<br>
+ * The replication logic will call the methods in the following order:<br>
+ * <code>
+ * 1. if ( entry.isDirty() ) <br>
+ * try {
+ * 2. entry.lock();<br>
+ * 3. byte[] diff = entry.getDiff();<br>
+ * 4. entry.reset();<br>
+ * } finally {<br>
+ * 5. entry.unlock();<br>
+ * }<br>
+ * }<br>
+ * </code>
+ * <br>
+ * <br>
+ * When the data is deserialized the logic is called in the following order<br>
+ * <code>
+ * 1. ReplicatedMapEntry entry = (ReplicatedMapEntry)objectIn.readObject();<br>
+ * 2. if ( isBackup(entry)||isPrimary(entry) ) entry.setOwner(owner); <br>
+ * </code>
+ * <br>
+ *
+ *
* @author Filip Hanik
* @version 1.0
*/
@@ -34,8 +55,6 @@
*/
public boolean isDirty();
- public boolean setDirty(boolean dirty);
-
/**
* If this returns true, the map will extract the diff using getDiff()
* Otherwise it will serialize the entire object.
@@ -58,7 +77,33 @@
* @param length int
* @throws IOException
*/
- public void applyDiff(byte[] diff, int offset, int length) throws IOException;
+ public void applyDiff(byte[] diff, int offset, int length) throws IOException, ClassNotFoundException;
+
+ /**
+ * Resets the current diff state and resets the dirty flag
+ */
+ public void resetDiff();
+
+ /**
+ * Lock during serialization
+ */
+ public void lock();
+
+ /**
+ * Unlock after serialization
+ */
+ public void unlock();
+
+ /**
+ * This method is called after the object has been
+ * created on a remote map. On this method,
+ * the object can initialize itself for any data that wasn't
+ *
+ * @param owner Object
+ */
+ public void setOwner(Object owner);
+
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Tue Mar 21 08:07:12 2006
@@ -30,6 +30,7 @@
import org.apache.catalina.tribes.tcp.ReplicationTransmitter;
import org.apache.tomcat.util.IntrospectionUtils;
import org.apache.catalina.tribes.tcp.*;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
/**
* <p>Title: </p>
@@ -64,7 +65,9 @@
.append("\n\t\t[-order]")
.append("\n\t\t[-ordersize maxorderqueuesize]")
.append("\n\t\t[-frag]")
- .append("\n\t\t[-fragsize maxmsgsize]");
+ .append("\n\t\t[-fragsize maxmsgsize]")
+ .append("\n\t\t[-async]")
+ .append("\n\t\t[-asyncsize maxqueuesizeinbytes]");
return buf;
}
@@ -89,6 +92,8 @@
Properties transportProperties = new Properties();
String transport = "org.apache.catalina.tribes.tcp.nio.PooledParallelSender";
String receiver = "org.apache.catalina.tribes.tcp.nio.NioReceiver";
+ boolean async = false;
+ int asyncsize = 1024*1024*50; //50MB
for (int i = 0; i < args.length; i++) {
if ("-bind".equals(args[i])) {
@@ -103,6 +108,11 @@
tcpthreadcount = Integer.parseInt(args[++i]);
} else if ("-gzip".equals(args[i])) {
gzip = true;
+ } else if ("-async".equals(args[i])) {
+ async = true;
+ } else if ("-asyncsize".equals(args[i])) {
+ asyncsize = Integer.parseInt(args[++i]);
+ System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize);
} else if ("-order".equals(args[i])) {
order = true;
} else if ("-ordersize".equals(args[i])) {
@@ -187,6 +197,13 @@
oi.setMaxQueue(ordersize);
channel.addInterceptor(oi);
}
+
+ if ( async ) {
+ MessageDispatchInterceptor mi = new MessageDispatchInterceptor();
+ mi.setMaxQueueSize(asyncsize);
+ channel.addInterceptor(mi);
+ }
+
return channel;
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/LoadTest.java Tue Mar 21 08:07:12 2006
@@ -55,6 +55,7 @@
public int statsInterval = 10000;
public long pause = 0;
public boolean breakonChannelException = false;
+ public boolean async = false;
public long receiveStart = 0;
public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
@@ -144,7 +145,7 @@
Thread.sleep(pause);
}
} catch (ChannelException x) {
- log.error("Unable to send message.");
+ log.error("Unable to send message:"+x.getMessage());
Member[] faulty = x.getFaultyMembers();
for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
--counter;
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Tue Mar 21 08:07:12 2006
@@ -45,7 +45,7 @@
protected SimpleTableDemo table;
public MapDemo(Channel channel ) {
- map = new LazyReplicatedMap(channel,5000, "MapDemo");
+ map = new LazyReplicatedMap(this,channel,5000, "MapDemo");
table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember(false).getName());
channel.addChannelListener(this);
channel.addMembershipListener(this);
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=387560&r1=387559&r2=387560&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Tue Mar 21 08:07:12 2006
@@ -29,6 +29,12 @@
Code Tasks:
===========================================
+32. Replicated JNDI entries in Tomcat in the format
+ cluster:<map name>/<entry key> for example
+ cluster:myapps/db/shared/dbinfo
+
+31. A layer on top of the GroupChannel, to allow multiple processes share
+ a channel to send/receive message to conserve system resources - this is way in the future.
30. CookieBasedReplicationMap - a very simple extension to the LazyReplicatedMap
but instead of randomly selecting a backup node and then publishing the PROXY to all
@@ -50,17 +56,6 @@
25. Member.uniqueId - 16 bytes unique for a member, UUID
Needed to not confuse a crashed member with a revived member on the same port
-24. MessageDispatchInterceptor - for asynchronous sending
- - looks at the options flag SEND_OPTIONS_ASYNCHRONOUS
- - has two modes
- a) async parallel send - each message to all destinations before next message
- b) async per/member - one thread per member using the FastAsyncQueue (good for groups with slow receivers)
- - (optional)persistent - writes messages to persistent store first, then starts processing
- - Callback error handler - for when messages fail, and the application wishes to become notified
- - MUST HAVE A LIMIT QUEUE SIZE IN MB, to avoid OOM errors or persist the queue.
- - MUST USE ClusterData.deepclone() to ensure thread safety if ClusterData objects get recycled
-
-
23. TotalOrderInterceptor - fairly straight forward implementation
This interceptor would depend on the fact that there is some sort of
membership coordinator, see task 9.
@@ -131,7 +126,8 @@
16. Guaranteed delivery of messages, ie either all get it or none get it.
Meaning, that all receivers get it, then wait for a process command.
- ala Gossip protocol
+ ala Gossip protocol - this is fairly redundant with a Xa2PhaseCommitInterceptor
+ except it doesn't keep a transaction log.
17. Implement transactions - the ability to start a transaction, send several messages,
and then commit the transaction
@@ -192,3 +188,14 @@
Notes: see Channel.SEND_OPT_XXXX variables
28. Thread pool should have maxThreads and minThreads and grow dynamically
+
+24. MessageDispatchInterceptor - for asynchronous sending
+ - looks at the options flag SEND_OPTIONS_ASYNCHRONOUS
+ - has two modes
+ a) async parallel send - each message to all destinations before next message
+ b) async per/member - one thread per member using the FastAsyncQueue (good for groups with slow receivers)
+ - Callback error handler - for when messages fail, and the application wishes to become notified
+ - MUST HAVE A LIMIT QUEUE SIZE IN MB, to avoid OOM errors or persist the queue.
+ - MUST USE ClusterData.deepclone() to ensure thread safety if ClusterData objects get recycled
+Notes: Simple implementation, one thread, invokes all senders in parallel.
+ Deep cloning is configurable as optimization.
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org