You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/07 22:46:33 UTC
svn commit: r384023 - in /tomcat/container/tc5.5.x/modules/groupcom:
src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
test/org/apache/catalina/tribes/demos/EchoRpcTest.java
Author: fhanik
Date: Tue Mar 7 13:46:32 2006
New Revision: 384023
URL: http://svn.apache.org/viewcvs?rev=384023&view=rev
Log:
Finished the Rpc channel and a sample demo
Added:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384023&r1=384022&r2=384023&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Tue Mar 7 13:46:32 2006
@@ -35,6 +35,7 @@
* @author Filip Hanik
*/
public class RpcChannel implements ChannelListener{
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(RpcChannel.class);
public static final int FIRST_REPLY = 1;
public static final int MAJORITY_REPLY = 2;
@@ -57,7 +58,7 @@
this.channel = channel;
this.callback = callback;
this.rpcId = rpcId;
- //channel.addChannelListener(this);
+ channel.addChannelListener(this);
}
@@ -75,6 +76,7 @@
int options,
long timeout) throws ChannelException, InterruptedException {
+ if ( destination==null || destination.length == 0 ) return new Response[0];
RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
RpcCollector collector = new RpcCollector(key,options,destination.length,timeout);
synchronized (collector) {
@@ -90,16 +92,26 @@
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage)msg;
RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
- RpcCollector collector = (RpcCollector)responseMap.get(key);
- if ( collector == null ) {
- callback.leftOver(rmsg.message,sender);
- } else {
- synchronized (collector) {
- collector.addResponse(rmsg.message,sender);
- if ( collector.isComplete() ) collector.notifyAll();
+ if ( rmsg.reply ) {
+ RpcCollector collector = (RpcCollector)responseMap.get(key);
+ if (collector == null) {
+ callback.leftOver(rmsg.message, sender);
+ } else {
+ synchronized (collector) {
+ collector.addResponse(rmsg.message, sender);
+ if (collector.isComplete()) collector.notifyAll();
+ }//synchronized
+ }//end if
+ } else{
+ Serializable reply = callback.replyRequest(rmsg.message,sender);
+ rmsg.reply = true;
+ rmsg.message = reply;
+ try {
+ channel.send(new Member[] {sender}, rmsg);
+ }catch ( Exception x ) {
+ log.error("Unable to send back reply in RpcChannel.",x);
}
- }
-
+ }//end if
}
public boolean accept(Serializable msg, Member sender) {
@@ -138,6 +150,11 @@
private Serializable message;
private byte[] uuid;
private byte[] rpcId;
+ private boolean reply = false;
+
+ public RpcMessage() {
+ //for serialization
+ }
public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
this.rpcId = rpcId;
@@ -146,6 +163,7 @@
}
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
+ reply = in.readBoolean();
int length = in.readInt();
uuid = new byte[length];
in.read(uuid, 0, length);
@@ -156,6 +174,7 @@
}
public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeBoolean(reply);
out.writeInt(uuid.length);
out.write(uuid, 0, uuid.length);
out.writeInt(rpcId.length);
Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java?rev=384023&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java Tue Mar 7 13:46:32 2006
@@ -0,0 +1,202 @@
+package org.apache.catalina.tribes.demos;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.tipis.RpcCallback;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.tipis.RpcChannel;
+import org.apache.catalina.tribes.tipis.Response;
+
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class EchoRpcTest implements RpcCallback, Runnable {
+
+ Channel channel;
+ int count;
+ String message;
+ long pause;
+ RpcChannel rpc;
+ int options;
+ long timeout;
+
+ public EchoRpcTest(Channel channel, String name, int count, String message, long pause, int options, long timeout) {
+ this.channel = channel;
+ this.count = count;
+ this.message = message;
+ this.pause = pause;
+ this.options = options;
+ this.rpc = new RpcChannel(name.getBytes(),channel,this);
+ this.timeout = timeout;
+ }
+
+ /**
+ * If the reply has already been sent to the requesting thread, the rpc
+ * callback can handle any data that comes in after the fact.
+ *
+ * @param msg Serializable
+ * @param sender Member
+ * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+ * method
+ */
+ public void leftOver(Serializable msg, Member sender) {
+ System.out.println("Received a left over message from ["+sender.getName()+"] with data ["+msg+"]");
+ }
+
+ /**
+ *
+ * @param msg Serializable
+ * @param sender Member
+ * @return Serializable - null if no reply should be sent
+ * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+ * method
+ */
+ public Serializable replyRequest(Serializable msg, Member sender) {
+ System.out.println("Received a reply request message from ["+sender.getName()+"] with data ["+msg+"]");
+ return "Reply:"+msg;
+ }
+
+ public void run() {
+ long counter = 0;
+ while (counter<count) {
+ String msg = message + " cnt="+(++counter);
+ try {
+ System.out.println("Sending ["+msg+"]");
+ long start = System.currentTimeMillis();
+ Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,timeout);
+ System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms.");
+ for ( int i=0; i<resp.length; i++ ) {
+ System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]");
+ }
+ Thread.sleep(pause);
+ }catch(Exception x){}
+ }
+ }
+
+ public static void usage() {
+ System.out.println("Tribes RPC tester.");
+ System.out.println("Usage:\n\t"+
+ "java EchoRpcTest [options]\n\t"+
+ "Options:\n\t\t"+
+ "[-mode all|first|majority] \n\t\t"+
+ "[-debug] \n\t\t"+
+ "[-count messagecount] \n\t\t"+
+ "[-timeout timeoutinms] \n\t\t"+
+ "[-stats statinterval] \n\t\t"+
+ "[-pause nrofsecondstopausebetweensends] \n\t\t"+
+ "[-message message] \n\t\t"+
+ "[-name rpcname] \n\t\t"+
+ "[-break (halts execution on exception)]\n"+
+ "\tChannel options:"+
+ ChannelCreator.usage()+"\n\n"+
+ "Example:\n\t"+
+ "java EchoRpcTest -port 4004\n\t"+
+ "java EchoRpcTest -bind 192.168.0.45 -port 4005\n\t"+
+ "java EchoRpcTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
+ }
+
+ public static void main(String[] args) throws Exception {
+ boolean send = true;
+ boolean debug = false;
+ long pause = 3000;
+ int count = 1000000;
+ int stats = 10000;
+ String name = "EchoRpcId";
+ boolean breakOnEx = false;
+ int threads = 1;
+ int options = RpcChannel.ALL_REPLY;
+ long timeout = 15000;
+ String message = "EchoRpcMessage";
+ if ( args.length == 0 ) {
+ args = new String[] {"-help"};
+ }
+ for (int i = 0; i < args.length; i++) {
+ if ("-threads".equals(args[i])) {
+ threads = Integer.parseInt(args[++i]);
+ } else if ("-count".equals(args[i])) {
+ count = Integer.parseInt(args[++i]);
+ System.out.println("Sending "+count+" messages.");
+ } else if ("-pause".equals(args[i])) {
+ pause = Long.parseLong(args[++i])*1000;
+ } else if ("-break".equals(args[i])) {
+ breakOnEx = true;
+ } else if ("-stats".equals(args[i])) {
+ stats = Integer.parseInt(args[++i]);
+ System.out.println("Stats every "+stats+" message");
+ } else if ("-timeout".equals(args[i])) {
+ timeout = Long.parseLong(args[++i]);
+ } else if ("-message".equals(args[i])) {
+ message = args[++i];
+ } else if ("-name".equals(args[i])) {
+ name = args[++i];
+ } else if ("-mode".equals(args[i])) {
+ if ( "all".equals(args[++i]) ) options = RpcChannel.ALL_REPLY;
+ else if ( "first".equals(args[i]) ) options = RpcChannel.FIRST_REPLY;
+ else if ( "majority".equals(args[i]) ) options = RpcChannel.MAJORITY_REPLY;
+ } else if ("-debug".equals(args[i])) {
+ debug = true;
+ } else if ("-help".equals(args[i]))
+ {
+ usage();
+ System.exit(1);
+ }
+ }
+
+
+ ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
+ EchoRpcTest test = new EchoRpcTest(channel,name,count,message,pause,options,timeout);
+ channel.start(channel.DEFAULT);
+ Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+ test.run();
+
+ System.out.println("System test complete, sleeping to let threads finish.");
+ Thread.sleep(60*1000*60);
+ }
+
+ public static class Shutdown extends Thread {
+ ManagedChannel channel = null;
+ public Shutdown(ManagedChannel channel) {
+ this.channel = channel;
+ }
+
+ public void run() {
+ System.out.println("Shutting down...");
+ SystemExit exit = new SystemExit(5000);
+ exit.setDaemon(true);
+ exit.start();
+ try {
+ channel.stop(channel.DEFAULT);
+
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ System.out.println("Channel stopped.");
+ }
+ }
+ public static class SystemExit extends Thread {
+ private long delay;
+ public SystemExit(long delay) {
+ this.delay = delay;
+ }
+ public void run () {
+ try {
+ Thread.sleep(delay);
+ }catch ( Exception x ) {
+ x.printStackTrace();
+ }
+ System.exit(0);
+
+ }
+ }}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org