You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/05/19 00:34:15 UTC
svn commit: r407648 - in /tomcat/container/tc5.5.x/modules/groupcom:
src/share/org/apache/catalina/tribes/group/interceptors/
src/share/org/apache/catalina/tribes/transport/bio/util/
test/java/org/apache/catalina/tribes/demos/
Author: fhanik
Date: Thu May 18 15:34:14 2006
New Revision: 407648
URL: http://svn.apache.org/viewvc?rev=407648&view=rev
Log:
Implemented the async using java.util.concurrent for the 1.5 package. much better results
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Thu May 18 15:34:14 2006
@@ -14,8 +14,14 @@
*/
package org.apache.catalina.tribes.group.interceptors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+
/**
*
* Same implementation as the MessageDispatchInterceptor
@@ -28,6 +34,7 @@
public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
protected AtomicLong currentSize = new AtomicLong(0);
+ protected LinkedBlockingQueue queue = new LinkedBlockingQueue();
public long getCurrentSize() {
return currentSize.get();
@@ -41,5 +48,34 @@
currentSize.set(value);
return value;
}
+
+ public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+ LinkObject obj = new LinkObject(msg,destination,payload);
+ return queue.offer(obj);
+ }
+
+ public LinkObject removeFromQueue() {
+ LinkObject head = null;
+ try {
+ head = (LinkObject)queue.take();
+ }catch ( InterruptedException x ) {}
+ return head;
+ }
+
+ public void startQueue() {
+ msgDispatchThread = new Thread(this);
+ msgDispatchThread.setName("MessageDispatch15Interceptor.MessageDispatchThread");
+ msgDispatchThread.setDaemon(true);
+ msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+ run = true;
+ msgDispatchThread.start();
+ }
+
+ public void stopQueue() {
+ run = false;
+ msgDispatchThread.interrupt();
+ setAndGetCurrentSize(0);
+ }
+
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/MessageDispatchInterceptor.java Thu May 18 15:34:14 2006
@@ -37,14 +37,15 @@
* @version 1.0
*/
public class MessageDispatchInterceptor extends ChannelInterceptorBase implements Runnable {
- private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class);
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(MessageDispatchInterceptor.class);
- private long maxQueueSize = 1024*1024*64; //64MB
- private FastQueue queue = new FastQueue();
- private boolean run = false;
- private Thread msgDispatchThread = null;
+ protected long maxQueueSize = 1024*1024*64; //64MB
+ protected FastQueue queue = new FastQueue();
+ protected boolean run = false;
+ protected Thread msgDispatchThread = null;
protected long currentSize = 0;
- private boolean useDeepClone = true;
+ protected boolean useDeepClone = true;
+ protected boolean alwaysSend = true;
public MessageDispatchInterceptor() {
setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS);
@@ -53,19 +54,51 @@
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
boolean async = (msg.getOptions() & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
if ( async && run ) {
- if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) throw new ChannelException("Asynchronous queue is full, reached its limit of "+maxQueueSize+" bytes, current:"+getCurrentSize()+" bytes.");
+ if ( (getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize ) {
+ if ( alwaysSend ) {
+ super.sendMessage(destination,msg,payload);
+ return;
+ } else {
+ throw new ChannelException("Asynchronous queue is full, reached its limit of " + maxQueueSize +" bytes, current:" + getCurrentSize() + " bytes.");
+ }//end if
+ }//end if
//add to queue
if ( useDeepClone ) msg = (ChannelMessage)msg.deepclone();
- if (!queue.add(msg, destination, payload) ) {
+ if (!addToQueue(msg, destination, payload) ) {
throw new ChannelException("Unable to add the message to the async queue, queue bug?");
}
addAndGetCurrentSize(msg.getMessage().getLength());
} else {
- System.out.println("Not queueing the message");
super.sendMessage(destination, msg, payload);
}
}
+ public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
+ return queue.add(msg,destination,payload);
+ }
+
+ public LinkObject removeFromQueue() {
+ return queue.remove();
+ }
+
+ public void startQueue() {
+ msgDispatchThread = new Thread(this);
+ msgDispatchThread.setName("MessageDispatchInterceptor.MessageDispatchThread");
+ msgDispatchThread.setDaemon(true);
+ msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
+ queue.setEnabled(true);
+ run = true;
+ msgDispatchThread.start();
+ }
+
+ public void stopQueue() {
+ run = false;
+ msgDispatchThread.interrupt();
+ queue.setEnabled(false);
+ setAndGetCurrentSize(0);
+ }
+
+
public void setOptionFlag(int flag) {
if ( flag != Channel.SEND_OPTIONS_ASYNCHRONOUS ) log.warn("Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use.");
super.setOptionFlag(flag);
@@ -106,13 +139,7 @@
if (!run ) {
synchronized (this) {
if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {//only start with the sender
- msgDispatchThread = new Thread(this);
- msgDispatchThread.setName("MessageDispatchThread");
- msgDispatchThread.setDaemon(true);
- msgDispatchThread.setPriority(Thread.MAX_PRIORITY);
- queue.setEnabled(true);
- run = true;
- msgDispatchThread.start();
+ startQueue();
}//end if
}//sync
}//end if
@@ -125,10 +152,7 @@
if ( run ) {
synchronized (this) {
if ( run && ((svc & Channel.SND_TX_SEQ)==svc)) {
- run = false;
- msgDispatchThread.interrupt();
- queue.setEnabled(false);
- setAndGetCurrentSize(0);
+ stopQueue();
}//end if
}//sync
}//end if
@@ -138,7 +162,7 @@
public void run() {
while ( run ) {
- LinkObject link = queue.remove();
+ LinkObject link = removeFromQueue();
if ( link == null ) continue; //should not happen unless we exceed wait time
while ( link != null && run ) {
ChannelMessage msg = link.data();
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/util/LinkObject.java Thu May 18 15:34:14 2006
@@ -71,6 +71,10 @@
public LinkObject next() {
return next;
}
+
+ public void setNext(LinkObject next) {
+ this.next = next;
+ }
/**
* Get the data object from the element.
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=407648&r1=407647&r2=407648&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/ChannelCreator.java Thu May 18 15:34:14 2006
@@ -71,7 +71,7 @@
.append("\n\t\t[-throughput]")
.append("\n\t\t[-failuredetect]")
.append("\n\t\t[-async]")
- .append("\n\t\t[-asyncsize maxqueuesizeinbytes]");
+ .append("\n\t\t[-asyncsize maxqueuesizeinkilobytes]");
return buf;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org