You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/07/03 15:24:14 UTC
svn commit: r418764 - in /tomcat/container/tc5.5.x/modules/groupcom: ./
src/share/org/apache/catalina/tribes/group/interceptors/
src/share/org/apache/catalina/tribes/transport/
src/share/org/apache/catalina/tribes/transport/nio/
test/java/org/apache/ca...
Author: fhanik
Date: Mon Jul 3 06:24:13 2006
New Revision: 418764
URL: http://svn.apache.org/viewvc?rev=418764&view=rev
Log:
Fixes to test cases mostly, some minor changes in the code base
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java Mon Jul 3 06:24:13 2006
@@ -4,6 +4,8 @@
import org.apache.catalina.tribes.Member;
import java.util.ArrayList;
import org.apache.catalina.tribes.group.AbsoluteOrder;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.Channel;
public class StaticMembershipInterceptor
extends ChannelInterceptorBase {
@@ -73,6 +75,27 @@
public Member getLocalMember(boolean incAlive) {
if (this.localMember != null ) return localMember;
else return super.getLocalMember(incAlive);
+ }
+
+ /**
+ * Send notifications upwards
+ * @param svc int
+ * @throws ChannelException
+ */
+ public void start(int svc) throws ChannelException {
+ if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) super.start(Channel.SND_RX_SEQ);
+ if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) super.start(Channel.SND_TX_SEQ);
+ final Member[] mbrs = (Member[])members.toArray(new Member[members.size()]);
+ final ChannelInterceptorBase base = this;
+ Thread t = new Thread() {
+ public void run() {
+ for (int i=0; i<mbrs.length; i++ ) {
+ base.memberAdded(mbrs[i]);
+ }
+ }
+ };
+ t.start();
+ super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ));
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/ThroughputInterceptor.java Mon Jul 3 06:24:13 2006
@@ -42,6 +42,7 @@
double mbAppTx = 0;
double mbRx = 0;
double timeTx = 0;
+ double lastCnt = 0;
AtomicLong msgTxCnt = new AtomicLong(1);
AtomicLong msgRxCnt = new AtomicLong(0);
AtomicLong msgTxErr = new AtomicLong(0);
@@ -67,16 +68,10 @@
if ( access.addAndGet(-1) == 0 ) {
long stop = System.currentTimeMillis();
timeTx += ( (double) (stop - txStart)) / 1000d;
- }
-
- if (msgTxCnt.get() % interval == 0) {
- double time = timeTx;
-
- if ( access.get() != 0 ) {
- long now = System.currentTimeMillis();
- time = (double)(now - txStart + timeTx)/1000d;
+ if ((msgTxCnt.get() / interval) >= lastCnt) {
+ lastCnt++;
+ report(timeTx);
}
- report(time);
}
msgTxCnt.addAndGet(1);
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ThreadPool.java Mon Jul 3 06:24:13 2006
@@ -78,8 +78,6 @@
public WorkerThread getWorker()
{
WorkerThread worker = null;
-
-
synchronized (mutex) {
while ( worker == null && running ) {
if (idle.size() > 0) {
@@ -113,7 +111,8 @@
if ( running ) {
synchronized (mutex) {
used.remove(worker);
- if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
+ //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
+ if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
else {
worker.setDoRun(false);
synchronized (worker){worker.notify();}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Mon Jul 3 06:24:13 2006
@@ -363,8 +363,8 @@
if (worker == null) {
// No threads available, do nothing, the selection
// loop will keep calling this method until a
- // thread becomes available.
- // FIXME: This design could be improved.
+ // thread becomes available, the thread pool itself has a waiting mechanism
+ // so we will not wait here.
if (log.isDebugEnabled())
log.debug("No TcpReplicationThread available");
} else {
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Mon Jul 3 06:24:13 2006
@@ -33,6 +33,10 @@
import org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+import java.util.ArrayList;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.catalina.tribes.Member;
/**
* <p>Title: </p>
@@ -65,6 +69,7 @@
.append("\n\t\t[-mfreq multicastfrequency]")
.append("\n\t\t[-mdrop multicastdroptime]")
.append("\n\t\t[-gzip]")
+ .append("\n\t\t[-static hostname:port (-static localhost:9999 -static 127.0.0.1:8888 can be repeated)]")
.append("\n\t\t[-order]")
.append("\n\t\t[-ordersize maxorderqueuesize]")
.append("\n\t\t[-frag]")
@@ -94,6 +99,7 @@
boolean frag = false;
int fragsize = 1024;
int autoBind = 10;
+ ArrayList staticMembers = new ArrayList();
Properties transportProperties = new Properties();
String transport = "org.apache.catalina.tribes.transport.nio.PooledParallelSender";
String receiver = "org.apache.catalina.tribes.transport.nio.NioReceiver";
@@ -122,6 +128,12 @@
} else if ("-asyncsize".equals(args[i])) {
asyncsize = Integer.parseInt(args[++i]);
System.out.println("Setting MessageDispatchInterceptor.maxQueueSize="+asyncsize);
+ } else if ("-static".equals(args[i])) {
+ String d = args[++i];
+ String h = d.substring(0,d.indexOf(":"));
+ String p = d.substring(h.length()+1);
+ MemberImpl m = new MemberImpl(h,Integer.parseInt(p),2000);
+ staticMembers.add(m);
} else if ("-throughput".equals(args[i])) {
throughput = true;
} else if ("-order".equals(args[i])) {
@@ -196,7 +208,7 @@
channel.setChannelReceiver(rx);
channel.setChannelSender(ps);
channel.setMembershipService(service);
-
+
if ( throughput ) channel.addInterceptor(new ThroughputInterceptor());
if (gzip) channel.addInterceptor(new GzipInterceptor());
if ( frag ) {
@@ -221,6 +233,15 @@
TcpFailureDetector tcpfi = new TcpFailureDetector();
channel.addInterceptor(tcpfi);
}
+ if ( staticMembers.size() > 0 ) {
+ StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
+ for (int x=0; x<staticMembers.size(); x++ ) {
+ smi.addStaticMember((Member)staticMembers.get(x));
+ }
+ channel.addInterceptor(smi);
+ }
+
+
byte[] domain = new byte[] {1,2,3,4,5,6,7,8,9,0};
((McastService)channel.getMembershipService()).setDomain(domain);
DomainFilterInterceptor filter = new DomainFilterInterceptor();
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/LoadTest.java Mon Jul 3 06:24:13 2006
@@ -293,6 +293,7 @@
"java LoadTest [options]\n\t"+
"Options:\n\t\t"+
"[-mode receive|send|both] \n\t\t"+
+ "[-startoptions startflags (default is Channel.DEFAULT) ] \n\t\t"+
"[-debug] \n\t\t"+
"[-count messagecount] \n\t\t"+
"[-stats statinterval] \n\t\t"+
@@ -319,6 +320,7 @@
boolean breakOnEx = false;
int threads = 1;
boolean shutdown = false;
+ int startoptions = Channel.DEFAULT;
int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
if ( args.length == 0 ) {
args = new String[] {"-help"};
@@ -341,6 +343,9 @@
} else if ("-sendoptions".equals(args[i])) {
channelOptions = Integer.parseInt(args[++i]);
System.out.println("Setting send options to "+channelOptions);
+ } else if ("-startoptions".equals(args[i])) {
+ startoptions = Integer.parseInt(args[++i]);
+ System.out.println("Setting start options to "+startoptions);
} else if ("-size".equals(args[i])) {
size = Integer.parseInt(args[++i])-4;
System.out.println("Message size will be:"+(size+4)+" bytes");
@@ -355,7 +360,6 @@
}
}
-
ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
@@ -365,7 +369,7 @@
messageSize = LoadMessage.getMessageSize(msg);
channel.addChannelListener(test);
channel.addMembershipListener(test);
- channel.start(channel.DEFAULT);
+ channel.start(startoptions);
Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
while ( threads > 1 ) {
Thread t = new Thread(test);
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketNioSend.java Mon Jul 3 06:24:13 2006
@@ -19,6 +19,7 @@
Selector selector = Selector.open();
Member mbr = new MemberImpl("localhost", 9999, 0);
ChannelData data = new ChannelData();
+ data.setOptions(Channel.SEND_OPTIONS_BYTE_MESSAGE);
data.setAddress(mbr);
byte[] buf = new byte[8192 * 4];
data.setMessage(new XByteBuffer(buf,false));
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/SocketReceive.java Mon Jul 3 06:24:13 2006
@@ -22,6 +22,7 @@
while ( true ) {
if ( first ) { first = false; start = System.currentTimeMillis();}
int len = in.read(buf);
+ if ( len == -1 ) System.exit(1);
mb += ( (double) len) / 1024 / 1024;
if ( ((count++) % 10000) == 0 ) {
long time = System.currentTimeMillis();
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/TribesTestSuite.java Mon Jul 3 06:24:13 2006
@@ -18,6 +18,7 @@
suite.addTestSuite(org.apache.catalina.tribes.test.membership.MemberSerialization.class);
suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestMemberArrival.class);
suite.addTestSuite(org.apache.catalina.tribes.test.membership.TestTcpFailureDetector.class);
+ suite.addTestSuite(org.apache.catalina.tribes.test.channel.TestDataIntegrity.class);
return suite;
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestDataIntegrity.java Mon Jul 3 06:24:13 2006
@@ -22,7 +22,7 @@
* @version 1.0
*/
public class TestDataIntegrity extends TestCase {
- int msgCount = 1000;
+ int msgCount = 10000;
GroupChannel channel1;
GroupChannel channel2;
Listener listener1;
Added: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java?rev=418764&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/test/channel/TestRemoteProcessException.java Mon Jul 3 06:24:13 2006
@@ -0,0 +1,116 @@
+package org.apache.catalina.tribes.test.channel;
+
+import junit.framework.TestCase;
+import java.io.Serializable;
+import java.util.Random;
+import java.util.Arrays;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TestRemoteProcessException extends TestCase {
+ int msgCount = 10000;
+ GroupChannel channel1;
+ GroupChannel channel2;
+ Listener listener1;
+ protected void setUp() throws Exception {
+ super.setUp();
+ channel1 = new GroupChannel();
+ channel2 = new GroupChannel();
+ listener1 = new Listener();
+ channel2.addChannelListener(listener1);
+ channel1.start(GroupChannel.DEFAULT);
+ channel2.start(GroupChannel.DEFAULT);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ channel1.stop(GroupChannel.DEFAULT);
+ channel2.stop(GroupChannel.DEFAULT);
+ }
+
+ public void testDataSendSYNCACK() throws Exception {
+ System.err.println("Starting SYNC_ACK");
+ int errC=0, nerrC=0;
+ for (int i=0; i<msgCount; i++) {
+ boolean error = Data.r.nextBoolean();
+ channel1.send(channel1.getMembers(),Data.createRandomData(error),GroupChannel.SEND_OPTIONS_SYNCHRONIZED_ACK|GroupChannel.SEND_OPTIONS_USE_ACK);
+ if ( error ) errC++; else nerrC++;
+ }
+ System.err.println("Finished SYNC_ACK");
+ assertEquals("Checking failure messages.",errC,listener1.errCnt);
+ assertEquals("Checking success messages.",nerrC,listener1.noErrCnt);
+ assertEquals("Checking all messages.",msgCount,listener1.noErrCnt+listener1.errCnt);
+ }
+
+ public static class Listener implements ChannelListener {
+ long noErrCnt = 0;
+ long errCnt = 0;
+ public boolean accept(Serializable s, Member m) {
+ return (s instanceof Data);
+ }
+
+ public void messageReceived(Serializable s, Member m) {
+ Data d = (Data)s;
+ if ( !Data.verify(d) ) {
+ System.err.println("ERROR");
+ } else {
+ if (d.error) {
+ errCnt++;
+ if ( (errCnt % 100) == 0) {
+ System.err.println("NORMAL:" + noErrCnt);
+ System.err.println("FAILURES:" + errCnt);
+ System.err.println("TOTAL:" + errCnt+noErrCnt);
+ }
+ } else {
+ noErrCnt++;
+ if ( (noErrCnt % 100) == 0) {
+ System.err.println("NORMAL:" + noErrCnt);
+ System.err.println("FAILURES:" + errCnt);
+ System.err.println("TOTAL:" + errCnt+noErrCnt);
+ }
+ }
+ }
+ }
+ }
+
+ public static class Data implements Serializable {
+ public int length;
+ public byte[] data;
+ public byte key;
+ public boolean error = false;
+ public static Random r = new Random(System.currentTimeMillis());
+ public static Data createRandomData(boolean error) {
+ int i = r.nextInt();
+ i = ( i % 127 );
+ int length = Math.abs(r.nextInt() % 65555);
+ Data d = new Data();
+ d.length = length;
+ d.key = (byte)i;
+ d.data = new byte[length];
+ Arrays.fill(d.data,d.key);
+ return d;
+ }
+
+ public static boolean verify(Data d) {
+ boolean result = (d.length == d.data.length);
+ for ( int i=0; result && (i<d.data.length); i++ ) result = result && d.data[i] == d.key;
+ return result;
+ }
+ }
+
+
+
+}
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=418764&r1=418763&r2=418764&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Mon Jul 3 06:24:13 2006
@@ -41,6 +41,9 @@
Code Tasks:
===========================================
+51. NioSender.setData should not expand the byte buffer if its too large
+ instead just refill it from the XByteBuffer
+
50. On top of versioning, implement version syncs from primary to backup
Or when a backup receives an update that is out of sync
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org