You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/07 22:46:33 UTC

svn commit: r384023 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/tipis/RpcChannel.java test/org/apache/catalina/tribes/demos/EchoRpcTest.java

Author: fhanik
Date: Tue Mar  7 13:46:32 2006
New Revision: 384023

URL: http://svn.apache.org/viewcvs?rev=384023&view=rev
Log:
Finished the Rpc channel and a sample demo

Added:
    tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384023&r1=384022&r2=384023&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Tue Mar  7 13:46:32 2006
@@ -35,6 +35,7 @@
  * @author Filip Hanik
  */
 public class RpcChannel implements ChannelListener{
+    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(RpcChannel.class);
     
     public static final int FIRST_REPLY = 1;
     public static final int MAJORITY_REPLY = 2;
@@ -57,7 +58,7 @@
         this.channel = channel;
         this.callback = callback;
         this.rpcId = rpcId;
-        //channel.addChannelListener(this);
+        channel.addChannelListener(this);
     }
     
     
@@ -75,6 +76,7 @@
                            int options, 
                            long timeout) throws ChannelException, InterruptedException {
         
+        if ( destination==null || destination.length == 0 ) return new Response[0];
         RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));
         RpcCollector collector = new RpcCollector(key,options,destination.length,timeout);
         synchronized (collector) {
@@ -90,16 +92,26 @@
     public void messageReceived(Serializable msg, Member sender) {
         RpcMessage rmsg = (RpcMessage)msg;
         RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
-        RpcCollector collector = (RpcCollector)responseMap.get(key);
-        if ( collector == null ) {
-            callback.leftOver(rmsg.message,sender);
-        } else {
-            synchronized (collector) {
-                collector.addResponse(rmsg.message,sender);
-                if ( collector.isComplete() ) collector.notifyAll();
+        if ( rmsg.reply ) {
+            RpcCollector collector = (RpcCollector)responseMap.get(key);
+            if (collector == null) {
+                callback.leftOver(rmsg.message, sender);
+            } else {
+                synchronized (collector) {
+                    collector.addResponse(rmsg.message, sender);
+                    if (collector.isComplete()) collector.notifyAll();
+                }//synchronized
+            }//end if
+        } else{
+            Serializable reply = callback.replyRequest(rmsg.message,sender);
+            rmsg.reply = true;
+            rmsg.message = reply;
+            try {
+                channel.send(new Member[] {sender}, rmsg);
+            }catch ( Exception x )  {
+                log.error("Unable to send back reply in RpcChannel.",x);
             }
-        }
-        
+        }//end if
     }
     
     public boolean accept(Serializable msg, Member sender) {
@@ -138,6 +150,11 @@
         private Serializable message;
         private byte[] uuid;
         private byte[] rpcId;
+        private boolean reply = false;
+
+        public RpcMessage() {
+            //for serialization
+        }
         
         public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
             this.rpcId = rpcId;
@@ -146,6 +163,7 @@
         }
         
         public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
+            reply = in.readBoolean();
             int length = in.readInt();
             uuid = new byte[length];
             in.read(uuid, 0, length);
@@ -156,6 +174,7 @@
         }
     
         public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeBoolean(reply);
             out.writeInt(uuid.length);
             out.write(uuid, 0, uuid.length);
             out.writeInt(rpcId.length);

Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java?rev=384023&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/EchoRpcTest.java Tue Mar  7 13:46:32 2006
@@ -0,0 +1,202 @@
+package org.apache.catalina.tribes.demos;
+
+import java.io.Serializable;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.tipis.RpcCallback;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.tipis.RpcChannel;
+import org.apache.catalina.tribes.tipis.Response;
+
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class EchoRpcTest implements RpcCallback, Runnable {
+    
+    Channel channel;
+    int count;
+    String message;
+    long pause;
+    RpcChannel rpc;
+    int options;
+    long timeout;
+    
+    public EchoRpcTest(Channel channel, String name, int count, String message, long pause, int options, long timeout) {
+        this.channel = channel;
+        this.count = count;
+        this.message = message;
+        this.pause = pause;
+        this.options = options;
+        this.rpc = new RpcChannel(name.getBytes(),channel,this);
+        this.timeout = timeout;
+    }
+
+    /**
+     * If the reply has already been sent to the requesting thread, the rpc
+     * callback can handle any data that comes in after the fact.
+     *
+     * @param msg Serializable
+     * @param sender Member
+     * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+     *   method
+     */
+    public void leftOver(Serializable msg, Member sender) {
+        System.out.println("Received a left over message from ["+sender.getName()+"] with data ["+msg+"]");
+    }
+
+    /**
+     *
+     * @param msg Serializable
+     * @param sender Member
+     * @return Serializable - null if no reply should be sent
+     * @todo Implement this org.apache.catalina.tribes.tipis.RpcCallback
+     *   method
+     */
+    public Serializable replyRequest(Serializable msg, Member sender) {
+        System.out.println("Received a reply request message from ["+sender.getName()+"] with data ["+msg+"]");
+        return "Reply:"+msg;
+    }
+    
+    public void run() {
+        long counter = 0;
+        while (counter<count) {
+            String msg = message + " cnt="+(++counter);
+            try {
+                System.out.println("Sending ["+msg+"]");
+                long start = System.currentTimeMillis();
+                Response[] resp = rpc.send(channel.getMembers(),(Serializable)msg,options,timeout);
+                System.out.println("Send of ["+msg+"] completed. Nr of responses="+resp.length+" Time:"+(System.currentTimeMillis()-start)+" ms.");
+                for ( int i=0; i<resp.length; i++ ) {
+                    System.out.println("Received a response message from ["+resp[i].getSource().getName()+"] with data ["+resp[i].getMessage()+"]");
+                }
+            Thread.sleep(pause);
+        }catch(Exception x){}
+        }
+    }
+    
+    public static void usage() {
+            System.out.println("Tribes RPC tester.");
+            System.out.println("Usage:\n\t"+
+                               "java EchoRpcTest [options]\n\t"+
+                               "Options:\n\t\t"+
+                               "[-mode all|first|majority]  \n\t\t"+
+                               "[-debug]  \n\t\t"+
+                               "[-count messagecount]  \n\t\t"+
+                               "[-timeout timeoutinms]  \n\t\t"+
+                               "[-stats statinterval]  \n\t\t"+
+                               "[-pause nrofsecondstopausebetweensends]  \n\t\t"+
+                               "[-message message]  \n\t\t"+
+                               "[-name rpcname]  \n\t\t"+
+                               "[-break (halts execution on exception)]\n"+
+                               "\tChannel options:"+
+                               ChannelCreator.usage()+"\n\n"+
+                               "Example:\n\t"+
+                               "java EchoRpcTest -port 4004\n\t"+
+                               "java EchoRpcTest -bind 192.168.0.45 -port 4005\n\t"+
+                               "java EchoRpcTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
+        }
+    
+        public static void main(String[] args) throws Exception {
+            boolean send = true;
+            boolean debug = false;
+            long pause = 3000;
+            int count = 1000000;
+            int stats = 10000;
+            String name = "EchoRpcId";
+            boolean breakOnEx = false;
+            int threads = 1;
+            int options = RpcChannel.ALL_REPLY;
+            long timeout = 15000;
+            String message = "EchoRpcMessage";
+            if ( args.length == 0 ) {
+                args = new String[] {"-help"};
+            }
+            for (int i = 0; i < args.length; i++) {
+                if ("-threads".equals(args[i])) {
+                    threads = Integer.parseInt(args[++i]);
+                } else if ("-count".equals(args[i])) {
+                    count = Integer.parseInt(args[++i]);
+                    System.out.println("Sending "+count+" messages.");
+                } else if ("-pause".equals(args[i])) {
+                    pause = Long.parseLong(args[++i])*1000;
+                } else if ("-break".equals(args[i])) {
+                    breakOnEx = true;
+                } else if ("-stats".equals(args[i])) {
+                    stats = Integer.parseInt(args[++i]);
+                    System.out.println("Stats every "+stats+" message");
+                } else if ("-timeout".equals(args[i])) {
+                    timeout = Long.parseLong(args[++i]);
+                } else if ("-message".equals(args[i])) {
+                    message = args[++i];
+                } else if ("-name".equals(args[i])) {
+                    name = args[++i];
+                } else if ("-mode".equals(args[i])) {
+                    if ( "all".equals(args[++i]) ) options = RpcChannel.ALL_REPLY;
+                    else if ( "first".equals(args[i]) ) options = RpcChannel.FIRST_REPLY;
+                    else if ( "majority".equals(args[i]) ) options = RpcChannel.MAJORITY_REPLY;
+                } else if ("-debug".equals(args[i])) {
+                    debug = true;
+                } else if ("-help".equals(args[i])) 
+                {
+                    usage();
+                    System.exit(1);
+                }
+            }
+    
+    
+            ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
+            EchoRpcTest test = new EchoRpcTest(channel,name,count,message,pause,options,timeout);
+            channel.start(channel.DEFAULT);
+            Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
+            test.run();
+    
+            System.out.println("System test complete, sleeping to let threads finish.");
+            Thread.sleep(60*1000*60);
+        } 
+    
+        public static class Shutdown extends Thread {
+            ManagedChannel channel = null;
+            public Shutdown(ManagedChannel channel) {
+                this.channel = channel;
+            }
+    
+            public void run() {
+                System.out.println("Shutting down...");
+                SystemExit exit = new SystemExit(5000);
+                exit.setDaemon(true);
+                exit.start();
+                try {
+                    channel.stop(channel.DEFAULT);
+    
+                }catch ( Exception x ) {
+                    x.printStackTrace();
+                }
+                System.out.println("Channel stopped.");
+            }
+        }
+        public static class SystemExit extends Thread {
+            private long delay;
+            public SystemExit(long delay) {
+                this.delay = delay;
+            }
+            public void run () {
+                try {
+                    Thread.sleep(delay);
+                }catch ( Exception x ) {
+                    x.printStackTrace();
+                }
+                System.exit(0);
+    
+            }
+    }}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org