You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 11:17:11 UTC
svn commit: r887149 - in
/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src:
main/java/org/apache/activemq/dispatch/internal/simple/
test/java/org/apache/activemq/dispatch/
Author: chirino
Date: Fri Dec 4 10:17:11 2009
New Revision: 887149
URL: http://svn.apache.org/viewvc?rev=887149&view=rev
Log:
optimize the thread local enqueue case.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887149&r1=887148&r2=887149&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Fri Dec 4 10:17:11 2009
@@ -30,7 +30,7 @@
private final SimpleDispatchSPI spi;
private final ThreadDispatchQueue[] threadQueues;
final AtomicLong threadQueuedRunnables = new AtomicLong();
-
+
public DispatcherThread(SimpleDispatchSPI spi, int ordinal) {
this.spi = spi;
this.threadQueues = new ThreadDispatchQueue[3];
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887149&r1=887148&r2=887149&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Fri Dec 4 10:17:11 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.dispatch.internal.simple;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -30,6 +31,7 @@
public class ThreadDispatchQueue implements SimpleQueue {
final String label;
+ final LinkedList<Runnable> localRunnables = new LinkedList<Runnable>();
final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
private DispatcherThread dispatcher;
final AtomicLong counter;
@@ -47,13 +49,30 @@
}
public void dispatchAsync(Runnable runnable) {
- counter.incrementAndGet();
- runnables.add(runnable);
- dispatcher.wakeup();
+ // We don't have to take the synchronization hit
+ // if the current thread is the dispatcher since we know it's not
+ // waiting.
+ if( Thread.currentThread()!=dispatcher ) {
+ counter.incrementAndGet();
+ runnables.add(runnable);
+ dispatcher.wakeup();
+ } else {
+ localRunnables.add(runnable);
+ }
}
public Runnable poll() {
- Runnable rc = runnables.poll();
+
+ // This method should only be called by our dispatcher
+ // thread.
+ assert Thread.currentThread()==dispatcher;
+
+ Runnable rc = localRunnables.poll();
+ if( rc !=null ) {
+ return rc;
+ }
+
+ rc = runnables.poll();
if( rc !=null ) {
counter.decrementAndGet();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887149&r1=887148&r2=887149&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec 4 10:17:11 2009
@@ -32,9 +32,9 @@
public class DispatchSystemTest {
public static void main(String[] args) throws Exception {
- DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
- benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
- benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
+// DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
+// benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
+// benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));