You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 11:17:11 UTC

svn commit: r887149 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src: main/java/org/apache/activemq/dispatch/internal/simple/ test/java/org/apache/activemq/dispatch/

Author: chirino
Date: Fri Dec  4 10:17:11 2009
New Revision: 887149

URL: http://svn.apache.org/viewvc?rev=887149&view=rev
Log:
optimize the thread local enqueue case.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887149&r1=887148&r2=887149&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Fri Dec  4 10:17:11 2009
@@ -30,7 +30,7 @@
     private final SimpleDispatchSPI spi;
     private final ThreadDispatchQueue[] threadQueues;
     final AtomicLong threadQueuedRunnables = new AtomicLong();
-
+    
     public DispatcherThread(SimpleDispatchSPI spi, int ordinal) {
         this.spi = spi;
         this.threadQueues = new ThreadDispatchQueue[3];

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887149&r1=887148&r2=887149&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Fri Dec  4 10:17:11 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.dispatch.internal.simple;
 
+import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -30,6 +31,7 @@
 public class ThreadDispatchQueue implements SimpleQueue {
 
     final String label;
+    final LinkedList<Runnable> localRunnables = new LinkedList<Runnable>();
     final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
     private DispatcherThread dispatcher;
     final AtomicLong counter;
@@ -47,13 +49,30 @@
     }
 
     public void dispatchAsync(Runnable runnable) {
-        counter.incrementAndGet();
-        runnables.add(runnable);
-        dispatcher.wakeup();
+        // We don't have to take the synchronization hit 
+        // if the current thread is the dispatcher since we know it's not
+        // waiting.
+        if( Thread.currentThread()!=dispatcher ) {
+            counter.incrementAndGet();
+            runnables.add(runnable);
+            dispatcher.wakeup();
+        } else {
+            localRunnables.add(runnable);
+        }
     }
 
     public Runnable poll() {
-        Runnable rc = runnables.poll();
+        
+        // This method should only be called by our dispatcher 
+        // thread.
+        assert Thread.currentThread()==dispatcher;
+        
+        Runnable rc = localRunnables.poll();
+        if( rc !=null ) {
+            return rc;
+        }
+        
+        rc = runnables.poll();
         if( rc !=null ) {
             counter.decrementAndGet();
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887149&r1=887148&r2=887149&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec  4 10:17:11 2009
@@ -32,9 +32,9 @@
 public class DispatchSystemTest {
     
     public static void main(String[] args) throws Exception {
-        DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
-        benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
-        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
+//        DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
+//        benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
+//        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
 
         DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));