You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/19 00:34:15 UTC

svn commit: r407648 - in /tomcat/container/tc5.5.x/modules/groupcom: src/share/org/apache/catalina/tribes/group/interceptors/ src/share/org/apache/catalina/tribes/transport/bio/util/ test/java/org/apache/catalina/tribes/demos/

Author: fhanik
Date: Thu May 18 15:34:14 2006
New Revision: 407648

URL: http://svn.apache.org/viewvc?rev=407648&view=rev
Log:
Implemented the async using java.util.concurrent for the 1.5 package. much better results

Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
    tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Thu May 18 15:34:14 2006
@@ -14,8 +14,14 @@
  */
 package org.apache.catalina.tribes.group.interceptors;
 
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+
 /**
  * 
  * Same implementation as the MessageDispatchInterceptor
@@ -28,6 +34,7 @@
 public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
 
     protected AtomicLong currentSize = new AtomicLong(0);
+    protected LinkedBlockingQueue queue = new LinkedBlockingQueue();
 
     public long getCurrentSize() {
         return currentSize.get();
@@ -41,5 +48,34 @@
         currentSize.set(value);
         return value;
     }
+    
+    public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+        LinkObject obj = new LinkObject(msg,destination,payload);
+        return queue.offer(obj);
+    }
+
+    public LinkObject removeFromQueue() {
+        LinkObject head = null;
+        try {
+            head = (LinkObject)queue.take();
+        }catch ( InterruptedException x ) {}
+        return head;
+    }
+
+    public void startQueue() {
+        msgDispatchThread = new Thread(this);
+        msgDispatchThread.setName("MessageDispatch15Interceptor.MessageDispatchThread");
+        msgDispatchThread.setDaemon(true);
+        msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+        run = true;
+        msgDispatchThread.start();
+    }
+
+    public void stopQueue() {
+        run = false;
+        msgDispatchThread.interrupt();
+        setAndGetCurrentSize(0);
+    }
+
 
 }

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Thu May 18 15:34:14 2006
@@ -37,14 +37,15 @@
  * @version 1.0
  */
 public class MessageDispatchInterceptor extends ChannelInterceptorBase implements Runnable {
-    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class);
+    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class);
 
-    private long maxQueueSize = 1024*1024*64; //64MB
-    private FastQueue queue = new FastQueue();
-    private boolean run = false;
-    private Thread msgDispatchThread = null;
+    protected long maxQueueSize = 1024*1024*64; //64MB
+    protected FastQueue queue = new FastQueue();
+    protected boolean run = false;
+    protected Thread msgDispatchThread = null;
     protected long currentSize = 0;
-    private boolean useDeepClone = true;
+    protected boolean useDeepClone = true;
+    protected boolean alwaysSend = true;
 
     public MessageDispatchInterceptor() {
         setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS);
@@ -53,19 +54,51 @@
     public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
         boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
         if ( async && run ) {
-            if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached its limit of "+maxQueueSize+" bytes, current:"+getCurrentSize()+" bytes.");
+            if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) {
+                if ( alwaysSend ) {
+                    super.sendMessage(destination,msg,payload);
+                    return;
+                } else {
+                    throw new ChannelException("Asynchronous queue is full, reached its limit of " + maxQueueSize +" bytes, current:" + getCurrentSize() + " bytes.");
+                }//end if
+            }//end if
             //add to queue
             if ( useDeepClone ) msg = (ChannelMessage)msg.deepclone();
-            if (!queue.add(msg, destination, payload) ) {
+            if (!addToQueue(msg, destination, payload) ) {
                 throw new ChannelException("Unable to add the message to the async queue, queue bug?");
             }
             addAndGetCurrentSize(msg.getMessage().getLength());
         } else {
-            System.out.println("Not queueing the message");
             super.sendMessage(destination, msg, payload);
         }
     }
     
+    public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+        return queue.add(msg,destination,payload);
+    }
+    
+    public LinkObject removeFromQueue() {
+        return queue.remove();
+    }
+    
+    public void startQueue() {
+        msgDispatchThread = new Thread(this);
+        msgDispatchThread.setName("MessageDispatchInterceptor.MessageDispatchThread");
+        msgDispatchThread.setDaemon(true);
+        msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+        queue.setEnabled(true);
+        run = true;
+        msgDispatchThread.start();
+    }
+    
+    public void stopQueue() {
+        run = false;
+        msgDispatchThread.interrupt();
+        queue.setEnabled(false);
+        setAndGetCurrentSize(0);
+    }
+    
+    
     public void setOptionFlag(int flag) {
         if ( flag != Channel.SEND_OPTIONS_ASYNCHRONOUS ) log.warn("Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use.");
         super.setOptionFlag(flag);
@@ -106,13 +139,7 @@
         if (!run ) {
             synchronized (this) {
                 if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {//only start with the sender
-                    msgDispatchThread = new Thread(this);
-                    msgDispatchThread.setName("MessageDispatchThread");
-                    msgDispatchThread.setDaemon(true);
-                    msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
-                    queue.setEnabled(true);
-                    run = true;
-                    msgDispatchThread.start();
+                    startQueue();
                 }//end if
             }//sync
         }//end if
@@ -125,10 +152,7 @@
         if ( run ) {
             synchronized (this) {
                 if ( run && ((svc & Channel.SND_TX_SEQ)==svc)) {
-                    run = false;
-                    msgDispatchThread.interrupt();
-                    queue.setEnabled(false);
-                    setAndGetCurrentSize(0);
+                    stopQueue();
                 }//end if
             }//sync
         }//end if
@@ -138,7 +162,7 @@
     
     public void run() {
         while ( run ) {
-            LinkObject link = queue.remove();
+            LinkObject link = removeFromQueue();
             if ( link == null ) continue; //should not happen unless we exceed wait time
             while ( link != null && run ) {
                 ChannelMessage msg = link.data();

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java Thu May 18 15:34:14 2006
@@ -71,6 +71,10 @@
     public LinkObject next() {
         return next;
     }
+    
+    public void setNext(LinkObject next) {
+        this.next = next;
+    }
 
     /**
      * Get the data object from the element.

Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Thu May 18 15:34:14 2006
@@ -71,7 +71,7 @@
            .append("\n\t\t[-throughput]")
            .append("\n\t\t[-failuredetect]")
            .append("\n\t\t[-async]")
-           .append("\n\t\t[-asyncsize maxqueuesizeinbytes]");
+           .append("\n\t\t[-asyncsize maxqueuesizeinkilobytes]");
        return buf;
 
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org