You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/06 20:54:04 UTC

svn commit: r887755 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/i...

Author: chirino
Date: Sun Dec  6 19:54:02 2009
New Revision: 887755

URL: http://svn.apache.org/viewvc?rev=887755&view=rev
Log:
Adding a DispatchOption enum to control the thread 'sticky' factor for serial queues.


Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedSerialDispatchQueue.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Sun Dec  6 19:54:02 2009
@@ -15,6 +15,8 @@
 import org.apache.activemq.metric.MetricCounter;
 import org.apache.activemq.transport.TransportFactory;
 
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
 abstract public class RemoteProducer extends Connection implements FlowUnblockListener<MessageDelivery> {
 
     protected final MetricCounter rate = new MetricCounter();
@@ -59,7 +61,7 @@
         
         setupProducer();
         
-        dispatchQueue = getDispatcher().createSerialQueue(name + "-client");
+        dispatchQueue = getDispatcher().createSerialQueue(name + "-client", STICK_TO_CALLER_THREAD);
         dispatchTask = new Runnable(){
             public void run() {
                 dispatch();

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java Sun Dec  6 19:54:02 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+public enum DispatchOption {
+    /**
+     * Updates the target queue to be the
+     * thread queue so that execution 'sticks' to caller's 
+     * thread queue.
+     */
+    STICK_TO_CALLER_THREAD,
+    
+    /**
+     * Used to update the target queue to be the first
+     * random thread queue that dispatches this queue.
+     */
+    STICK_TO_DISPATCH_THREAD, 
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.dispatch;
 
+import java.util.EnumSet;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -31,7 +33,7 @@
     public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit);
     public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException;
     
-    String getLabel();
-
+    public String getLabel();
+    public Set<DispatchOption> getOptions();
                       
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Sun Dec  6 19:54:02 2009
@@ -18,7 +18,9 @@
 
 import java.nio.channels.SelectableChannel;
 
+
 /**
+ * Provides easy access to a system wide Dispatcher.
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -27,9 +29,11 @@
     final private static Dispatcher dispatcher = create();
 
     private static Dispatcher create() {
-        return new DispatcherConfig().createDispatcher();
+        Dispatcher rc = new DispatcherConfig().createDispatcher();
+        rc.retain();
+        return rc;
     }
-
+    
     static DispatchQueue getMainQueue() {
         return dispatcher.getMainQueue();
     }
@@ -42,8 +46,8 @@
         return dispatcher.getGlobalQueue(priority);
     }
     
-    static DispatchQueue getSerialQueue(String label) {
-        return dispatcher.createSerialQueue(label);
+    static DispatchQueue getSerialQueue(String label, DispatchOption...options) {
+        return dispatcher.createSerialQueue(label, options);
     }
     
     static void dispatchMain() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Sun Dec  6 19:54:02 2009
@@ -21,11 +21,11 @@
 
 
 public interface Dispatcher extends Retained {
-        
+
     public DispatchQueue getGlobalQueue();
     public DispatchQueue getGlobalQueue(DispatchPriority priority);
     
-    public DispatchQueue createSerialQueue(String label);
+    public DispatchQueue createSerialQueue(String label, DispatchOption... options);
     
     public DispatchQueue getMainQueue();
     public void dispatchMain();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java Sun Dec  6 19:54:02 2009
@@ -24,7 +24,7 @@
     public void onThreadCreate(DispatcherThread thread);
     public void onThreadDestroy(DispatcherThread thread);
 
-    public void onQueueCreate(DispatchQueue queue);
+    public void onQueueCreate(DispatchQueue queue, DispatchOption...options);
     public void onQueueDestroy(DispatchQueue queue);
     
     public void onSourceCreate(DispatchSource source);

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchOption;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractSerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
+
+    private final String label;
+    private final AtomicInteger suspendCounter = new AtomicInteger();
+    private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+    private final AtomicLong size = new AtomicLong();
+    private final Set<DispatchOption> options;
+
+    public AbstractSerialDispatchQueue(String label, DispatchOption...options) {
+        this.label = label;
+        this.options = set(options);
+        retain();
+    }
+
+    static private Set<DispatchOption> set(DispatchOption[] options) {
+        if( options==null || options.length==0 )
+            return Collections.emptySet() ;
+        return Collections.unmodifiableSet(EnumSet.copyOf(Arrays.asList(options)));
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    public void resume() {
+        if( suspendCounter.decrementAndGet() == 0 ) {
+            if( size.get() != 0 ) {
+                dispatchSelfAsync();
+            }
+        }
+    }
+
+    public void suspend() {
+        suspendCounter.incrementAndGet();
+    }
+
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        throw new RuntimeException("TODO: implement me.");
+    }
+
+    public void execute(Runnable command) {
+        dispatchAsync(command);
+    }
+
+    public void dispatchAsync(Runnable runnable) {
+        if( runnable == null ) {
+            throw new IllegalArgumentException();
+        }
+        long lastSize = size.getAndIncrement();
+        if( lastSize==0 ) {
+            retain();
+        }
+        runnables.add(runnable);
+        if( lastSize == 0 && suspendCounter.get()<=0 ) {
+            dispatchSelfAsync();
+        }
+    }
+
+    protected void dispatchSelfAsync() {
+        targetQueue.dispatchAsync(this);
+    }
+
+    public void run() {
+        Runnable runnable;
+        long lsize = size.get();
+        while( suspendCounter.get() <= 0 && lsize > 0 ) {
+            try {
+                runnable = runnables.poll();
+                if( runnable!=null ) {
+                    runnable.run();
+                    lsize = size.decrementAndGet();
+                    if( lsize==0 ) {
+                        release();
+                    }
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void dispatchSync(Runnable runnable) throws InterruptedException {
+        dispatchApply(1, runnable);
+    }
+    
+    public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+        QueueSupport.dispatchApply(this, iterations, runnable);
+    }
+
+    public Set<DispatchOption> getOptions() {
+        return options;
+    }
+
+    
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Sun Dec  6 19:54:02 2009
@@ -24,13 +24,13 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.dispatch.internal.BaseRetained;
-import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
@@ -183,8 +183,8 @@
         return globalQueues[priority.ordinal()];
     }
     
-    public DispatchQueue createSerialQueue(String label) {
-        AdvancedSerialDispatchQueue rc = new AdvancedSerialDispatchQueue(label);
+    public DispatchQueue createSerialQueue(String label, DispatchOption... options) {
+        SerialDispatchQueue rc = new SerialDispatchQueue(label, options);
         rc.setTargetQueue(getGlobalQueue());
         return rc;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -16,8 +16,12 @@
  */
 package org.apache.activemq.dispatch.internal.advanced;
 
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.internal.QueueSupport;
@@ -95,4 +99,9 @@
 
     public void retain() {
     }
+    
+    public Set<DispatchOption> getOptions() {
+        return Collections.emptySet();
+    }
+
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -0,0 +1,32 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+
+public class SerialDispatchQueue extends AbstractSerialDispatchQueue {
+
+    public SerialDispatchQueue(String label, DispatchOption...options) {
+        super(label, options);
+//        context = new DispatchContext(this, true, label);
+}
+    
+//    @Override
+//    protected void dispatchSelfAsync() {
+//        AdvancedQueue aq = ((AdvancedQueue)targetQueue);
+//        super.dispatchSelfAsync();
+//    }
+    
+    @Override
+    public void run() {
+        DispatchQueue original = AdvancedDispatcher.CURRENT_QUEUE.get();
+        AdvancedDispatcher.CURRENT_QUEUE.set(this);
+        try {
+            super.run();
+        } finally {
+            AdvancedDispatcher.CURRENT_QUEUE.set(original);
+        }
+    }
+    
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -16,8 +16,12 @@
  */
 package org.apache.activemq.dispatch.internal.advanced;
 
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.internal.QueueSupport;
@@ -95,5 +99,10 @@
 
     public void retain() {
     }
+    
+    public Set<DispatchOption> getOptions() {
+        return Collections.emptySet();
+    }
+
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Sun Dec  6 19:54:02 2009
@@ -19,8 +19,6 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.DispatchPriority;
-
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -33,9 +31,9 @@
         
     public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
         this.dispatcher = dispatcher;
-        this.threadQueues = new ThreadDispatchQueue[3];
-        for (int i = 0; i < 3; i++) {
-            threadQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i] );
+        this.threadQueues = new ThreadDispatchQueue[dispatcher.globalQueues.length];
+        for (int i = 0; i < threadQueues.length; i++) {
+            threadQueues[i] = new ThreadDispatchQueue(this, dispatcher.globalQueues[i]);
         }
         setName(dispatcher.getLabel()+" dispatcher: "+(ordinal+1));
         setDaemon(true);
@@ -43,11 +41,12 @@
     
     @Override
     public void run() {
+        GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
         try {
             outer: while( true ) {
                 int counter=0;
-                for (SimpleQueue queue : threadQueues) {
-                    SimpleDispatcher.CURRENT_QUEUE.set(queue);
+                for (ThreadDispatchQueue queue : threadQueues) {
+                    SimpleDispatcher.CURRENT_QUEUE.set(queue.globalQueue);
                     Runnable runnable;
                     while( (runnable = queue.poll())!=null ) {
                         dispatch(runnable);
@@ -60,8 +59,8 @@
                     continue;
                 }
                 
-                for (SimpleQueue queue : dispatcher.globalQueues) {
-                    SimpleDispatcher.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
+                for (SimpleQueue queue : globalQueues) {
+                    SimpleDispatcher.CURRENT_QUEUE.set(queue);
                     
                     Runnable runnable;
                     while( (runnable = queue.poll())!=null ) {
@@ -134,6 +133,14 @@
             e.printStackTrace();
         }
     }
+    
+    public static DispatcherThread currentDispatcherThread() {
+        Thread currentThread = Thread.currentThread();
+        if( currentThread.getClass() == DispatcherThread.class ) {
+            return (DispatcherThread) currentThread;
+        }
+        return null;
+    }
 
     private final Object wakeupMutex = new Object();
     private boolean inWaitingList;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -16,10 +16,14 @@
  */
 package org.apache.activemq.dispatch.internal.simple;
 
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.internal.QueueSupport;
@@ -28,7 +32,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class GlobalDispatchQueue implements SimpleQueue {
+final public class GlobalDispatchQueue implements SimpleQueue {
 
     private final SimpleDispatcher dispatcher;
     final String label;
@@ -69,6 +73,32 @@
         QueueSupport.dispatchApply(this, iterations, runnable);
     }
 
+    public ThreadDispatchQueue getTargetQueue() {
+        DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+        if( thread == null ) {
+            return null;
+        }
+        return thread.threadQueues[priority.ordinal()];
+    }
+    
+    public Runnable poll() {
+        Runnable rc = runnables.poll();
+        if( rc !=null ) {
+            counter.decrementAndGet();
+        }
+        return rc;
+    }
+
+    public DispatchPriority getPriority() {
+        return priority;
+    }
+
+    public void release() {
+    }
+
+    public void retain() {
+    }
+
     public void resume() {
         throw new UnsupportedOperationException();
     }
@@ -93,25 +123,20 @@
         throw new UnsupportedOperationException();
     }
 
-    public DispatchQueue getTargetQueue() {
-        throw new UnsupportedOperationException();
-    }
-    
-    public Runnable poll() {
-        Runnable rc = runnables.poll();
-        if( rc !=null ) {
-            counter.decrementAndGet();
-        }
-        return rc;
+    public Set<DispatchOption> getOptions() {
+        return Collections.emptySet();
     }
 
-    public DispatchPriority getPriority() {
-        return priority;
+    public GlobalDispatchQueue isGlobalDispatchQueue() {
+        return this;
     }
 
-    public void release() {
+    public SerialDispatchQueue isSerialDispatchQueue() {
+        return null;
     }
 
-    public void retain() {
+    public ThreadDispatchQueue isThreadDispatchQueue() {
+        return null;
     }
+
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+
+public final class SerialDispatchQueue extends AbstractSerialDispatchQueue implements SimpleQueue {
+
+    private final SimpleDispatcher dispatcher;
+    private volatile boolean stickToThreadOnNextDispatch; 
+    private volatile boolean stickToThreadOnNextDispatchRequest; 
+    
+    SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption...options) {
+        super(label, options);
+        this.dispatcher = dispatcher;
+    }
+
+    @Override
+    public void setTargetQueue(DispatchQueue targetQueue) {
+        GlobalDispatchQueue global = ((SimpleQueue)targetQueue).isGlobalDispatchQueue(); 
+        if( getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) && global!=null ) {
+            stickToThreadOnNextDispatchRequest=true;
+        }
+        super.setTargetQueue(targetQueue);
+    }
+    
+    @Override
+    public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+        dispatcher.timerThread.addRelative(runnable, this, delay, unit);
+    }
+
+    @Override
+    protected void dispatchSelfAsync() {
+        if( stickToThreadOnNextDispatchRequest ) {
+            SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
+            if( current!=null ) {
+                SimpleQueue parent;
+                while( (parent = current.getTargetQueue()) !=null ) {
+                    current = parent;
+                }
+                if( current.isThreadDispatchQueue()==null ) {
+                    System.out.println("crap");
+                }
+                super.setTargetQueue(current);
+                stickToThreadOnNextDispatchRequest=false;
+            }
+        }
+        super.dispatchSelfAsync();
+    }
+    
+    @Override
+    public void run() {
+        SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
+        if( stickToThreadOnNextDispatch ) {
+            stickToThreadOnNextDispatch=false;
+            GlobalDispatchQueue global = current.isGlobalDispatchQueue();
+            if( global!=null ) {
+                setTargetQueue(global.getTargetQueue());
+            }
+        }
+        SimpleDispatcher.CURRENT_QUEUE.set(this);
+        try {
+            super.run();
+        } finally {
+            SimpleDispatcher.CURRENT_QUEUE.set(current);
+        }
+    }
+
+    public DispatchPriority getPriority() {
+        throw new UnsupportedOperationException();
+    }
+
+    public Runnable poll() {
+        throw new UnsupportedOperationException();
+    }
+
+    public GlobalDispatchQueue isGlobalDispatchQueue() {
+        return null;
+    }
+
+    public SerialDispatchQueue isSerialDispatchQueue() {
+        return this;
+    }
+
+    public ThreadDispatchQueue isThreadDispatchQueue() {
+        return null;
+    }
+    
+    public SimpleQueue getTargetQueue() {
+        return (SimpleQueue) targetQueue;
+    }
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Sun Dec  6 19:54:02 2009
@@ -18,7 +18,6 @@
 
 import java.nio.channels.SelectableChannel;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -27,8 +26,9 @@
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatchSource;
 import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.internal.BaseRetained;
-import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
+import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
 
 import static org.apache.activemq.dispatch.DispatchPriority.*;
 
@@ -41,9 +41,9 @@
  */
 final public class SimpleDispatcher extends BaseRetained implements Dispatcher {
         
-    public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+    public final static ThreadLocal<SimpleQueue> CURRENT_QUEUE = new ThreadLocal<SimpleQueue>();
 
-    final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+    final SerialDispatchQueue mainQueue = new SerialDispatchQueue(this, "main");
     final GlobalDispatchQueue globalQueues[]; 
     final DispatcherThread dispatchers[];
     final AtomicLong globalQueuedRunnables = new AtomicLong();
@@ -74,13 +74,8 @@
         return globalQueues[priority.ordinal()];
     }
     
-    public DispatchQueue createSerialQueue(String label) {
-        SerialDispatchQueue rc = new SerialDispatchQueue(label) {
-            @Override
-            public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
-                timerThread.addRelative(runnable, this, delay, unit);
-            }
-        };
+    public DispatchQueue createSerialQueue(String label, DispatchOption... options) {
+        AbstractSerialDispatchQueue rc = new SerialDispatchQueue(this, label, options);
         rc.setTargetQueue(getGlobalQueue());
         return rc;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java Sun Dec  6 19:54:02 2009
@@ -7,4 +7,11 @@
 
     Runnable poll();
     DispatchPriority getPriority();
+    
+    SerialDispatchQueue isSerialDispatchQueue();
+    ThreadDispatchQueue isThreadDispatchQueue();
+    GlobalDispatchQueue isGlobalDispatchQueue();
+    
+    SimpleQueue getTargetQueue();
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Sun Dec  6 19:54:02 2009
@@ -16,32 +16,35 @@
  */
 package org.apache.activemq.dispatch.internal.simple;
 
+import java.util.Collections;
 import java.util.LinkedList;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchOption;
 import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.internal.QueueSupport;
 
 /**
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class ThreadDispatchQueue implements SimpleQueue {
+final public class ThreadDispatchQueue implements SimpleQueue {
 
     final String label;
     final LinkedList<Runnable> localRunnables = new LinkedList<Runnable>();
     final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
-    private DispatcherThread dispatcher;
+    final DispatcherThread dispatcher;
     final AtomicLong counter;
-    private final DispatchPriority priority;
+    final GlobalDispatchQueue globalQueue;
     
-    public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchPriority priority) {
+    public ThreadDispatchQueue(DispatcherThread dispatcher, GlobalDispatchQueue globalQueue) {
         this.dispatcher = dispatcher;
-        this.priority = priority;
-        this.label=priority.toString();
+        this.globalQueue = globalQueue;
+        this.label="thread local "+globalQueue.getLabel();
         this.counter = dispatcher.threadQueuedRunnables;
     }
 
@@ -119,12 +122,13 @@
     public void setTargetQueue(DispatchQueue queue) {
         throw new UnsupportedOperationException();
     }
-    public DispatchQueue getTargetQueue() {
-        throw new UnsupportedOperationException();
+    
+    public SimpleQueue getTargetQueue() {
+        return null;
     }
     
     public DispatchPriority getPriority() {
-        return priority;
+        return globalQueue.getPriority();
     }
 
     public void release() {
@@ -133,4 +137,20 @@
     public void retain() {
     }
 
+    public Set<DispatchOption> getOptions() {
+        return Collections.emptySet();
+    }
+
+    public GlobalDispatchQueue isGlobalDispatchQueue() {
+        return null;
+    }
+
+    public SerialDispatchQueue isSerialDispatchQueue() {
+        return null;
+    }
+
+    public ThreadDispatchQueue isThreadDispatchQueue() {
+        return this;
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java Sun Dec  6 19:54:02 2009
@@ -8,7 +8,7 @@
 
 import static org.apache.activemq.dispatch.internal.simple.TimerThread.Type.*;
 
-public class TimerThread extends Thread {
+final public class TimerThread extends Thread {
     enum Type {
         RELATIVE,
         ABSOLUTE,

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Sun Dec  6 19:54:02 2009
@@ -17,6 +17,8 @@
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
 
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
 
 /** 
  * ActorTest
@@ -49,7 +51,7 @@
         Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
         advancedSystem.retain();
         
-        DispatchQueue queue = advancedSystem.createSerialQueue("test");
+        DispatchQueue queue = advancedSystem.createSerialQueue("test", STICK_TO_CALLER_THREAD);
         ActorTestObject testObject = Actor.create(new ActorTestObject(), queue);
         
         CountDownLatch latch = new CountDownLatch(1);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Sun Dec  6 19:54:02 2009
@@ -36,7 +36,7 @@
         Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
         advancedSystem.retain();
         benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
-        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test"));
+        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test", DispatchOption.STICK_TO_CALLER_THREAD));
 
         RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
         advancedSystem.addShutdownWatcher(latch);
@@ -47,7 +47,7 @@
         simpleSystem.retain();
         
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
-        benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test"));
+        benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test", DispatchOption.STICK_TO_CALLER_THREAD));
 
         latch = new RunnableCountDownLatch(1);
         advancedSystem.addShutdownWatcher(latch);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Sun Dec  6 19:54:02 2009
@@ -21,8 +21,8 @@
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatcherConfig;
 
-
 import static java.lang.String.*;
+import static org.apache.activemq.dispatch.DispatchOption.*;
 
 /**
  * 
@@ -64,7 +64,7 @@
 
         private Work(CountDownLatch counter, AdvancedDispatcher dispatcher) {
             this.counter = counter;
-            dispatchQueue = dispatcher.createSerialQueue("test");
+            dispatchQueue = dispatcher.createSerialQueue("test", STICK_TO_CALLER_THREAD);
         }
 
         public void run() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Sun Dec  6 19:54:02 2009
@@ -38,9 +38,9 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatchPriority;
 import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatcherConfig;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
@@ -61,6 +61,8 @@
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.Subscription;
 
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
 public class SharedQueuePerfTest extends TestCase {
 
     private static int PERFORMANCE_SAMPLES = 5;
@@ -308,7 +310,7 @@
             sendRate.name("Producer " + name + " Rate");
             totalProducerRate.add(sendRate);
             
-            dispatchQueue = dispatcher.createSerialQueue(name);
+            dispatchQueue = dispatcher.createSerialQueue(name, STICK_TO_CALLER_THREAD);
             dispatchTask = new Runnable(){
                 public void run() {
                     dispatch();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Sun Dec  6 19:54:02 2009
@@ -19,11 +19,13 @@
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
 /**
  * Base class for a {@link FlowControllable}
  * {@link IFlowQueue}.
@@ -135,7 +137,7 @@
     public synchronized void setDispatcher(Dispatcher dispatcher) {
         this.dispatcher = dispatcher;
         
-        dispatchQueue = dispatcher.createSerialQueue(getResourceName());
+        dispatchQueue = dispatcher.createSerialQueue(getResourceName(), STICK_TO_CALLER_THREAD);
         dispatchTask = new Runnable(){
             public void run() {
                 if( pollingDispatch() ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java Sun Dec  6 19:54:02 2009
@@ -11,6 +11,8 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
 
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
 public class RemoteProducer extends ClientConnection implements FlowUnblockListener<Message> {
 
     private final MetricCounter rate = new MetricCounter();
@@ -48,7 +50,7 @@
         super.start();
         outboundController = outputQueue.getFlowController(outboundFlow);
         
-        dispatchQueue = getDispatcher().createSerialQueue(name + "-client");
+        dispatchQueue = getDispatcher().createSerialQueue(name + "-client", STICK_TO_CALLER_THREAD);
         dispatchTask = new Runnable(){
             public void run() {
                 dispatch();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Sun Dec  6 19:54:02 2009
@@ -14,7 +14,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.Dispatcher;
 import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
@@ -34,6 +33,8 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
 public class PipeTransportFactory extends TransportFactory {
     static private final Object EOF_TOKEN = new Object();
 
@@ -83,7 +84,7 @@
         }
 
         public void setDispatcher(Dispatcher dispatcher) {
-            dispatchQueue = dispatcher.createSerialQueue(name);
+            dispatchQueue = dispatcher.createSerialQueue(name, STICK_TO_CALLER_THREAD);
             dispatchTask = new Runnable(){
                 public void run() {
                     dispatch();