You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/29 15:31:47 UTC

svn commit: r1040125 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/

Author: chirino
Date: Mon Nov 29 14:31:47 2010
New Revision: 1040125

URL: http://svn.apache.org/viewvc?rev=1040125&view=rev
Log:
Backing out the use HIGH priority queues.. the real for the fairness problem was in the transport.  It processes up to 64k worth of data in a single execution loop so that other connections get a chance at getting processed.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1040125&r1=1040124&r2=1040125&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Mon Nov 29 14:31:47 2010
@@ -172,7 +172,7 @@ class Broker() extends BaseService with 
 
   var connectors: List[Connector] = Nil
 
-  val dispatchQueue = getGlobalQueue(DispatchPriority.HIGH).createQueue("broker");
+  val dispatchQueue = createQueue("broker") // getGlobalQueue(DispatchPriority.HIGH).createQueue("broker")
   if( STICK_ON_THREAD_QUEUES ) {
     dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1040125&r1=1040124&r2=1040125&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Mon Nov 29 14:31:47 2010
@@ -74,7 +74,7 @@ class VirtualHost(val broker: Broker, va
   import VirtualHost._
   
   override protected def log = VirtualHost
-  override val dispatchQueue:DispatchQueue = getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host");
+  override val dispatchQueue:DispatchQueue = createQueue("virtual-host") // getGlobalQueue(DispatchPriority.HIGH).createQueue("virtual-host")
 
   var config:VirtualHostDTO = _
   val router = new Router(this)

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1040125&r1=1040124&r2=1040125&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Mon Nov 29 14:31:47 2010
@@ -410,21 +410,26 @@ public class TcpTransport extends JavaBa
             return;
         }
         try {
-            Object command = codec.read();
-            while ( command!=null ) {
-                try {
-                    listener.onTransportCommand(command);
-                } catch (Throwable e) {
-                    e.printStackTrace();
-                    onTransportFailure(new IOException("Transport listener failure."));
-                }
-
-                // the transport may be suspended after processing a command.
-                if (getServiceState() == STOPPED || readSource.isSuspended()) {
+            long initial = codec.getReadCounter();
+            // Only process upto 64k worth of data at a time so we can give
+            // other connections a chance to process their requests.
+            while( codec.getReadCounter()-initial < 1024*64 ) {
+                Object command = codec.read();
+                if ( command!=null ) {
+                    try {
+                        listener.onTransportCommand(command);
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        onTransportFailure(new IOException("Transport listener failure."));
+                    }
+
+                    // the transport may be suspended after processing a command.
+                    if (getServiceState() == STOPPED || readSource.isSuspended()) {
+                        return;
+                    }
+                } else {
                     return;
                 }
-
-                command = codec.read();
             }
         } catch (IOException e) {
             onTransportFailure(e);