You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/29 15:31:47 UTC
svn commit: r1040125 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/
Author: chirino
Date: Mon Nov 29 14:31:47 2010
New Revision: 1040125
URL: http://svn.apache.org/viewvc?rev=1040125&view=rev
Log:
Backing out the use HIGH priority queues.. the real for the fairness problem was in the transport. It processes up to 64k worth of data in a single execution loop so that other connections get a chance at getting processed.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1040125&r1=1040124&r2=1040125&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Mon Nov 29 14:31:47 2010
@@ -172,7 +172,7 @@ class Broker() extends BaseService with
var connectors: List[Connector] = Nil
- val dispatchQueue = getGlobalQueue(DispatchPriority.HIGH).createQueue("broker");
+ val dispatchQueue = createQueue("broker") // getGlobalQueue(DispatchPriority.HIGH).createQueue("broker")
if( STICK_ON_THREAD_QUEUES ) {
dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1040125&r1=1040124&r2=1040125&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Mon Nov 29 14:31:47 2010
@@ -74,7 +74,7 @@ class VirtualHost(val broker: Broker, va
import VirtualHost._
override protected def log = VirtualHost
- override val dispatchQueue:DispatchQueue = getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host");
+ override val dispatchQueue:DispatchQueue = createQueue("virtual-host") // getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host")
var config:VirtualHostDTO = _
val router = new Router(this)
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1040125&r1=1040124&r2=1040125&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Mon Nov 29 14:31:47 2010
@@ -410,21 +410,26 @@ public class TcpTransport extends JavaBa
return;
}
try {
- Object command = codec.read();
- while ( command!=null ) {
- try {
- listener.onTransportCommand(command);
- } catch (Throwable e) {
- e.printStackTrace();
- onTransportFailure(new IOException("Transport listener failure."));
- }
-
- // the transport may be suspended after processing a command.
- if (getServiceState() == STOPPED || readSource.isSuspended()) {
+ long initial = codec.getReadCounter();
+ // Only process upto 64k worth of data at a time so we can give
+ // other connections a chance to process their requests.
+ while( codec.getReadCounter()-initial < 1024*64 ) {
+ Object command = codec.read();
+ if ( command!=null ) {
+ try {
+ listener.onTransportCommand(command);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ onTransportFailure(new IOException("Transport listener failure."));
+ }
+
+ // the transport may be suspended after processing a command.
+ if (getServiceState() == STOPPED || readSource.isSuspended()) {
+ return;
+ }
+ } else {
return;
}
-
- command = codec.read();
}
} catch (IOException e) {
onTransportFailure(e);