You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 16:32:13 UTC

svn commit: r887228 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/i...

Author: chirino
Date: Fri Dec  4 15:32:12 2009
New Revision: 887228

URL: http://svn.apache.org/viewvc?rev=887228&view=rev
Log:
More refactoring in efforts of eventaully unifying the adanced package model with the dispatch package model.


Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Fri Dec  4 15:32:12 2009
@@ -6,8 +6,8 @@
 import org.apache.activemq.apollo.Connection;
 import org.apache.activemq.apollo.broker.Destination;
 import org.apache.activemq.apollo.broker.MessageDelivery;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISinkController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Fri Dec  4 15:32:12 2009
@@ -29,4 +29,5 @@
 
     public void setFinalizer(Runnable finalizer);
     public void setTargetQueue(DispatchQueue queue);
+    public DispatchQueue getTargetQueue();
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java Fri Dec  4 15:32:12 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatchContext;
+
+public interface DispatchObserver {
+    
+    /**
+     * Should be called when a {@link DispatchContext#requestDispatch()} is called.
+     * This assists the load balancer in determining relationships between {@link DispatchContext}s
+     * @param caller The calling dispatcher
+     * @param context The context from which the dispatch is requested.
+     */
+    public void onDispatch(Dispatcher caller, PooledDispatchContext context);
+
+    /**
+     * Must be called by the dispatcher when a {@link DispatchContext} is closed.
+     */
+    public void close();
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec  4 15:32:12 2009
@@ -45,29 +45,38 @@
         return CURRENT_QUEUE.get();
     }
 
-    private final static DispatchSPI spi = cretateDispatchSystemSPI();
+    private static DispatchSPI spi;
+    
     private static DispatchSPI cretateDispatchSystemSPI() {
         return new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
     }
     
+    synchronized private static DispatchSPI spi() {
+        if(spi==null) {
+            spi = cretateDispatchSystemSPI();
+        }
+        return spi;
+    }
+    
     static DispatchQueue getMainQueue() {
-        return spi.getMainQueue();
+        return spi().getMainQueue();
     }
     
     static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
-        return spi.getGlobalQueue(priority);
+        return spi().getGlobalQueue(priority);
     }
     
     static DispatchQueue createQueue(String label) {
-        return spi.createQueue(label);
+        return spi().createQueue(label);
     }
     
     static void dispatchMain() {
-        spi.dispatchMain();
+        spi().dispatchMain();
     }
 
     static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
-        return spi.createSource(channel, interestOps, queue);
+        return spi().createSource(channel, interestOps, queue);
     }
 
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Fri Dec  4 15:32:12 2009
@@ -46,5 +46,7 @@
         this.targetQueue = targetQueue;
     }
 
-
+    public DispatchQueue getTargetQueue() {
+        return this.targetQueue;
+    }
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java Fri Dec  4 15:32:12 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.advanced;
+
+
+public class DispachableAdapter implements Runnable {
+    
+    public static final int MAX_DISPATCH_PER_LOOP = 20;
+    
+    private Dispatchable dispatchable;
+
+    private final DispatcherPool pool;
+
+    public DispachableAdapter(DispatcherPool pool, Dispatchable dispatchable) {
+        this.pool = pool;
+        this.dispatchable = dispatchable;
+    }
+
+    public void run() {
+        int processed=0;
+        while( true ) {
+            if( dispatchable.dispatch() ) {
+                break;
+            }
+            processed++;
+            if( processed > MAX_DISPATCH_PER_LOOP ) {
+                // Still not done.. so we re-enqueue the dispatch request 
+                // and exit the current run loop.
+                pool.getCurrentDispatchContext().requestDispatch();
+                break;
+            }
+        }
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java Fri Dec  4 15:32:12 2009
@@ -0,0 +1,44 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.RejectedExecutionException;
+
+
+/**
+ * Returned to callers registered with this dispathcer. Used by the caller
+ * to inform the dispatcher that it is ready for dispatch.
+ * 
+ * Note that DispatchContext is not safe for concurrent access by multiple
+ * threads.
+ */
+public interface DispatchContext {
+    /**
+     * Once registered with a dispatcher, this can be called to request
+     * dispatch. The {@link Dispatchable} will remain in the dispatch queue
+     * until a subsequent call to {@link Dispatchable#dispatch()} returns
+     * false;
+     * 
+     * @throws RejectedExecutionException If the dispatcher has been shutdown.
+     */
+    public void requestDispatch() throws RejectedExecutionException;
+
+    /**
+     * This can be called to update the dispatch priority.
+     * 
+     * @param priority
+     */
+    public void updatePriority(int priority);
+
+    /**
+     * Gets the name of the dispatch context
+     * 
+     * @return The dispatchable
+     */
+    public String getName();
+
+    /**
+     * This must be called to release any resource the dispatcher is holding
+     * on behalf of this context. Once called this {@link DispatchContext} should
+     * no longer be used. 
+     */
+    public void close(boolean sync);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java Fri Dec  4 15:32:12 2009
@@ -0,0 +1,15 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+/**
+ * This interface is implemented by Dispatchable entities. A Dispatchable
+ * entity registers with an {@link Dispatcher} and is returned a
+ * {@link DispatchContext} which it can use to request the
+ * {@link Dispatcher} to invoke {@link Dispatchable#dispatch()}
+ * 
+ * {@link Dispatcher} guarantees that {@link #dispatch()} will never invoke
+ * dispatch concurrently unless the {@link Dispatchable} is registered with
+ * more than one {@link Dispatcher};
+ */
+public interface Dispatchable {
+    public boolean dispatch();
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java Fri Dec  4 15:32:12 2009
@@ -17,92 +17,11 @@
 package org.apache.activemq.dispatch.internal.advanced;
 
 import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public interface Dispatcher extends Executor {
 
     /**
-     * This interface is implemented by Dispatchable entities. A Dispatchable
-     * entity registers with an {@link Dispatcher} and is returned a
-     * {@link DispatchContext} which it can use to request the
-     * {@link Dispatcher} to invoke {@link Dispatchable#dispatch()}
-     * 
-     * {@link Dispatcher} guarantees that {@link #dispatch()} will never invoke
-     * dispatch concurrently unless the {@link Dispatchable} is registered with
-     * more than one {@link Dispatcher};
-     */
-    public interface Dispatchable {
-        public boolean dispatch();
-    }
-
-    /**
-     * Returned to callers registered with this dispathcer. Used by the caller
-     * to inform the dispatcher that it is ready for dispatch.
-     * 
-     * Note that DispatchContext is not safe for concurrent access by multiple
-     * threads.
-     */
-    public interface DispatchContext {
-        /**
-         * Once registered with a dispatcher, this can be called to request
-         * dispatch. The {@link Dispatchable} will remain in the dispatch queue
-         * until a subsequent call to {@link Dispatchable#dispatch()} returns
-         * false;
-         * 
-         * @throws RejectedExecutionException If the dispatcher has been shutdown.
-         */
-        public void requestDispatch() throws RejectedExecutionException;
-
-        /**
-         * This can be called to update the dispatch priority.
-         * 
-         * @param priority
-         */
-        public void updatePriority(int priority);
-
-        /**
-         * Gets the Dispatchable that this context represents.
-         * 
-         * @return The dispatchable
-         */
-        public Dispatchable getDispatchable();
-
-        /**
-         * Gets the name of the dispatch context
-         * 
-         * @return The dispatchable
-         */
-        public String getName();
-
-        /**
-         * This must be called to release any resource the dispatcher is holding
-         * on behalf of this context. Once called this {@link DispatchContext} should
-         * no longer be used. 
-         */
-        public void close(boolean sync);
-    }
-
-    public class RunnableAdapter implements Dispatchable, Runnable {
-        private Runnable runnable;
-
-        public RunnableAdapter() {
-            runnable = this;
-        }
-        public RunnableAdapter(Runnable runnable) {
-            this.runnable = runnable;
-        }
-
-        public boolean dispatch() {
-            runnable.run();
-            return true;
-        }
-
-        public void run() {
-        }
-    }
-
-    /**
      * Registers a {@link Dispatchable} with this dispatcher, and returns a
      * {@link DispatchContext} that the caller can use to request dispatch.
      * 
@@ -113,6 +32,8 @@
      * @return A {@link DispatchContext} that can be used to request dispatch
      */
     public DispatchContext register(Dispatchable dispatchable, String name);
+    
+    public DispatchContext register(Runnable runnable, String name);
 
     /**
      * Gets the number of dispatch priorities. Dispatch priorities are 0 based, 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java Fri Dec  4 15:32:12 2009
@@ -167,6 +167,10 @@
         return chooseDispatcher().register(dispatchable, name);
     }
 
+    public DispatchContext register(Runnable runnable, String name) {
+        return chooseDispatcher().register(runnable, name);
+    }
+
     public String toString() {
         return name;
     }
@@ -182,7 +186,7 @@
     public final Executor createPriorityExecutor(final int priority) {
         return new Executor() {
             public void execute(final Runnable runnable) {
-                chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+                chooseDispatcher().dispatch(runnable, priority);
             }
 
         };
@@ -194,11 +198,11 @@
     }
 
     public void execute(Runnable command) {
-        chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
+        chooseDispatcher().dispatch(command, 0);
     }
     
     public void execute(Runnable command, int priority) {
-        chooseDispatcher().dispatch(new RunnableAdapter(command), priority);
+        chooseDispatcher().dispatch(command, priority);
     }
 
     public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec  4 15:32:12 2009
@@ -25,9 +25,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.dispatch.DispatchObserver;
 import org.apache.activemq.dispatch.DispatchSystem;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
-import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.PriorityLinkedList;
 import org.apache.activemq.util.TimerHeap;
@@ -46,7 +46,7 @@
     protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
 
     // Set if this dispatcher is part of a dispatch pool:
-    protected final DispatcherPool pooledDispatcher;
+    protected final DispatcherPool dispatcherPool;
 
     // The local dispatch queue:
     protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
@@ -88,7 +88,7 @@
         for (int i = 0; i < 2; i++) {
             foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
         }
-        this.pooledDispatcher = pooledDispactcher;
+        this.dispatcherPool = pooledDispactcher;
     }
 
     public static final Dispatcher createPriorityDispatcher(String name, int numPriorities) {
@@ -145,7 +145,11 @@
     }
 
     public DispatchContext register(Dispatchable dispatchable, String name) {
-        return new PriorityDispatchContext(dispatchable, true, name);
+        return register(new DispachableAdapter(dispatcherPool, dispatchable), name);
+    }
+    
+    public DispatchContext register(Runnable runnable, String name) {
+        return new PriorityDispatchContext(runnable, true, name);
     }
 
     /*
@@ -170,7 +174,7 @@
         Thread joinThread = null;
         synchronized (this) {
             if (thread != null) {
-                dispatchInternal(new RunnableAdapter() {
+                dispatchInternal(new Runnable() {
                     public void run() {
                         running = false;
                     }
@@ -200,52 +204,37 @@
 
     public void run() {
 
-        if (pooledDispatcher != null) {
+        if (dispatcherPool != null) {
             // Inform the dispatcher that we have started:
-            pooledDispatcher.onDispatcherStarted((DispatcherThread) this);
+            dispatcherPool.onDispatcherStarted((DispatcherThread) this);
         }
 
         PriorityDispatchContext pdc;
         try {
-            final int MAX_DISPATCH_PER_LOOP = 20;
-            int processed = 0;
-
             while (running) {
-                pdc = priorityQueue.poll();
+                int counter = 0;
                 // If no local work available wait for foreign work:
-                if (pdc == null) {
-                    waitForEvents();
-                } else {
+                while((pdc = priorityQueue.poll())!=null){
                     if( pdc.priority < dispatchQueues.length ) {
                         DispatchSystem.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
                     }
                     
                     if (pdc.tracker != null) {
-                        pooledDispatcher.setCurrentDispatchContext(pdc);
+                        dispatcherPool.setCurrentDispatchContext(pdc);
                     }
 
-                    while (!pdc.dispatch()) {
-                        processed++;
-                        if (processed > MAX_DISPATCH_PER_LOOP || pdc.listPrio < priorityQueue.getHighestPriority()) {
-                            // Give other dispatchables a shot:
-                            // May have gotten relinked by the caller:
-                            if (!pdc.isLinked()) {
-                                priorityQueue.add(pdc, pdc.listPrio);
-                            }
-                            break;
-                        }
-                    }
+                    counter++;
+                    pdc.run();
 
                     if (pdc.tracker != null) {
-                        pooledDispatcher.setCurrentDispatchContext(null);
+                        dispatcherPool.setCurrentDispatchContext(null);
                     }
+                }
 
-                    if (processed < MAX_DISPATCH_PER_LOOP) {
-                        continue;
-                    }
+                if( counter==0 ) {
+                    waitForEvents();
                 }
 
-                processed = 0;
                 // Execute delayed events:
                 timerHeap.executeReadyTimers();
 
@@ -271,7 +260,6 @@
                         fe.unlink();
                         fe.execute();
                     }
-
                 }
             }
         } catch (InterruptedException e) {
@@ -279,8 +267,8 @@
         } catch (Throwable thrown) {
             thrown.printStackTrace();
         } finally {
-            if (pooledDispatcher != null) {
-                pooledDispatcher.onDispatcherStopped((DispatcherThread) this);
+            if (dispatcherPool != null) {
+                dispatcherPool.onDispatcherStopped((DispatcherThread) this);
             }
             cleanup();
         }
@@ -362,8 +350,8 @@
     }
 
     //Special dispatch method that allow high priority dispatch:
-    private final void dispatchInternal(Dispatchable dispatchable, int priority) {
-        PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+    private final void dispatchInternal(Runnable runnable, int priority) {
+        PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
         context.priority = priority;
         context.requestDispatch();
     }
@@ -375,8 +363,8 @@
      * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
      * .dispatch.Dispatcher.Dispatchable)
      */
-    public final void dispatch(Dispatchable dispatchable, int priority) {
-        PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+    public final void dispatch(Runnable runnable, int priority) {
+        PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
         context.updatePriority(priority);
         context.requestDispatch();
     }
@@ -391,17 +379,17 @@
         return new Executor() {
 
             public void execute(final Runnable runnable) {
-                dispatch(new RunnableAdapter(runnable), priority);
+                dispatch(runnable, priority);
             }
         };
     }
 
     public void execute(final Runnable runnable) {
-        dispatch(new RunnableAdapter(runnable), 0);
+        dispatch(runnable, 0);
     }
     
     public void execute(final Runnable runnable, int prio) {
-        dispatch(new RunnableAdapter(runnable), prio);
+        dispatch(runnable, prio);
     }
 
     /*
@@ -437,8 +425,8 @@
     }
 
     private final DispatcherThread getCurrentDispatcher() {
-        if (pooledDispatcher != null) {
-            return (DispatcherThread) pooledDispatcher.getCurrentDispatcher();
+        if (dispatcherPool != null) {
+            return (DispatcherThread) dispatcherPool.getCurrentDispatcher();
         } else if (Thread.currentThread() == thread) {
             return (DispatcherThread) this;
         } else {
@@ -448,15 +436,15 @@
     }
 
     private final PooledDispatchContext getCurrentDispatchContext() {
-        return pooledDispatcher.getCurrentDispatchContext();
+        return dispatcherPool.getCurrentDispatchContext();
     }
 
     /**
      * 
      */
     protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext {
-        // The dispatchable target:
-        private final Dispatchable dispatchable;
+        // The target:
+        private final Runnable runnable;
         // The name of this context:
         final String name;
         // list prio can only be updated in the thread of of the owning
@@ -467,7 +455,7 @@
         // from foreign threads:
         final UpdateEvent updateEvent[];
 
-        private final ExecutionTracker tracker;
+        private final DispatchObserver tracker;
         protected DispatcherThread currentOwner;
         private DispatcherThread updateDispatcher = null;
 
@@ -476,12 +464,12 @@
         private boolean closed = false;
         final CountDownLatch closeLatch = new CountDownLatch(1);
 
-        protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
-            this.dispatchable = dispatchable;
+        protected PriorityDispatchContext(Runnable runnable, boolean persistent, String name) {
+            this.runnable = runnable;
             this.name = name;
             this.currentOwner = (DispatcherThread) DispatcherThread.this;
-            if (persistent && pooledDispatcher != null) {
-                this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
+            if (persistent && dispatcherPool != null) {
+                this.tracker = dispatcherPool.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
             } else {
                 this.tracker = null;
             }
@@ -502,7 +490,7 @@
          * 
          * @return the execution tracker for the context:
          */
-        public ExecutionTracker getExecutionTracker() {
+        public DispatchObserver getExecutionTracker() {
             return tracker;
         }
 
@@ -511,11 +499,11 @@
          * 
          * @return False if the dispatchable has more work to do.
          */
-        public final boolean dispatch() {
-            return dispatchable.dispatch();
+        public final void run() {
+            runnable.run();
         }
 
-        public final void assignToNewDispatcher(Dispatcher newDispatcher) {
+        public final void setTargetQueue(Dispatcher newDispatcher) {
             synchronized (this) {
 
                 // If we're already set to this dispatcher
@@ -538,7 +526,7 @@
 
             DispatcherThread callingDispatcher = getCurrentDispatcher();
             if (tracker != null)
-                tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
+                tracker.onDispatch(callingDispatcher, getCurrentDispatchContext());
 
             // Otherwise this is coming off another thread, so we need to
             // synchronize
@@ -691,11 +679,7 @@
             return getName();
         }
 
-        public Dispatchable getDispatchable() {
-            return dispatchable;
-        }
-
-        public DispatcherThread getDispatcher() {
+        public DispatcherThread getTargetQueue() {
             return currentOwner;
         }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Fri Dec  4 15:32:12 2009
@@ -84,4 +84,7 @@
         throw new UnsupportedOperationException();
     }
 
+    public DispatchQueue getTargetQueue() {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java Fri Dec  4 15:32:12 2009
@@ -16,27 +16,12 @@
  */
 package org.apache.activemq.dispatch.internal.advanced;
 
+import org.apache.activemq.dispatch.DispatchObserver;
+
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
 
 public interface LoadBalancer {
 
-    public interface ExecutionTracker {
-        
-        /**
-         * Should be called when a {@link DispatchContext#requestDispatch()} is called.
-         * This assists the load balancer in determining relationships between {@link DispatchContext}s
-         * @param caller The calling dispatcher
-         * @param context The context from which the dispatch is requested.
-         */
-        public void onDispatchRequest(Dispatcher caller, PooledDispatchContext context);
-
-        /**
-         * Must be called by the dispatcher when a {@link DispatchContext} is closed.
-         */
-        public void close();
-    }
-    
     /**
      * Must be called by a dispatch thread when it starts
      * @param dispatcher The dispatcher
@@ -50,11 +35,11 @@
     public void onDispatcherStopped(Dispatcher dispatcher);
 
     /**
-     * Gets an {@link ExecutionTracker} for the dispatch context. 
+     * Gets an {@link DispatchObserver} for the dispatch context. 
      * @param context
      * @return
      */
-    public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+    public DispatchObserver createExecutionTracker(PooledDispatchContext context);
 
     /**
      * Starts execution tracking

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java Fri Dec  4 15:32:12 2009
@@ -1,7 +1,6 @@
 package org.apache.activemq.dispatch.internal.advanced;
 
-import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.DispatchObserver;
 
 /**
  * A {@link PooledDispatchContext}s can be moved between different
@@ -12,7 +11,7 @@
      * Called to transfer a {@link PooledDispatchContext} to a new
      * Dispatcher.
      */
-    public void assignToNewDispatcher(Dispatcher newDispatcher);
+    public void setTargetQueue(Dispatcher newDispatcher);
 
     /**
      * Gets the dispatcher to which this PooledDispatchContext currently
@@ -20,12 +19,12 @@
      * 
      * @return
      */
-    public Dispatcher getDispatcher();
+    public Dispatcher getTargetQueue();
 
     /**
      * Gets the execution tracker for the context.
      * 
      * @return the execution tracker for the context:
      */
-    public ExecutionTracker getExecutionTracker();
+    public DispatchObserver getExecutionTracker();
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Fri Dec  4 15:32:12 2009
@@ -22,6 +22,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.dispatch.DispatchObserver;
+
 
 public class SimpleLoadBalancer implements LoadBalancer {
 
@@ -97,7 +99,7 @@
         dispatchers.remove(dispatcher);
     }
 
-    public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
+    public DispatchObserver createExecutionTracker(PooledDispatchContext context) {
         return new SimpleExecutionTracker(context);
     }
 
@@ -116,7 +118,7 @@
         }
     }
 
-    private class SimpleExecutionTracker implements ExecutionTracker {
+    private class SimpleExecutionTracker implements DispatchObserver {
         private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
         private final PooledDispatchContext context;
         private final AtomicInteger work = new AtomicInteger(0);
@@ -126,7 +128,7 @@
 
         SimpleExecutionTracker(PooledDispatchContext context) {
             this.context = context;
-            currentOwner = context.getDispatcher();
+            currentOwner = context.getTargetQueue();
         }
 
         /**
@@ -143,7 +145,7 @@
          * @return True if this method resulted in the dispatch request being
          *         assigned to another dispatcher.
          */
-        public void onDispatchRequest(Dispatcher callingDispatcher, PooledDispatchContext callingContext) {
+        public void onDispatch(Dispatcher callingDispatcher, PooledDispatchContext callingContext) {
 
             if (callingContext != null) {
                 // Make sure we are being called by another node:
@@ -166,7 +168,7 @@
                                 System.out.println("Assigning: " + context + " to " + callingContext + "'s  dispatcher: " + callingDispatcher + " From: " + currentOwner);
 
                             currentOwner = callingDispatcher;
-                            context.assignToNewDispatcher(callingDispatcher);
+                            context.setTargetQueue(callingDispatcher);
                         }
 
                     } else {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Fri Dec  4 15:32:12 2009
@@ -82,4 +82,8 @@
         throw new UnsupportedOperationException();
     }
 
+    public DispatchQueue getTargetQueue() {
+        throw new UnsupportedOperationException();
+    }
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Fri Dec  4 15:32:12 2009
@@ -27,6 +27,7 @@
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 final public class DispatcherThread extends Thread {
+    private static final int MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO = 10000;
     private final SimpleDispatchSPI spi;
     private final ThreadDispatchQueue[] threadQueues;
     final AtomicLong threadQueuedRunnables = new AtomicLong();
@@ -78,6 +79,20 @@
                 // no runnables to dispatch.
                 continue;
             }
+        
+//        GlobalDispatchQueue[] globalQueues = spi.globalQueues;
+//        while( true ) {
+//
+//            if( dispatch(threadQueues[0]) 
+//                || dispatch(globalQueues[0]) 
+//                || dispatch(threadQueues[1]) 
+//                || dispatch(globalQueues[1]) 
+//                || dispatch(threadQueues[2]) 
+//                || dispatch(globalQueues[2]) 
+//                ) {
+//                continue;
+//            }
+//        
             try {
                 waitForWakeup();
             } catch (InterruptedException e) {
@@ -87,6 +102,23 @@
         }
     }
 
+    private boolean dispatch(SimpleQueue queue) {
+        int counter=0;
+        Runnable runnable;
+        while( counter < MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO ) {
+            runnable = queue.poll();
+            if( runnable == null ) {
+                break;
+            }        
+            if( counter==0 ) {
+                DispatchSystem.CURRENT_QUEUE.set(queue);
+            }
+            dispatch(runnable);
+            counter++;
+        }
+        return counter!=0;
+    }
+
     private void dispatch(Runnable runnable) {
         try {
             runnable.run();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Fri Dec  4 15:32:12 2009
@@ -87,6 +87,10 @@
     public void setTargetQueue(DispatchQueue queue) {
         throw new UnsupportedOperationException();
     }
+
+    public DispatchQueue getTargetQueue() {
+        throw new UnsupportedOperationException();
+    }
     
     public Runnable poll() {
         Runnable rc = runnables.poll();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Fri Dec  4 15:32:12 2009
@@ -114,6 +114,10 @@
     public void setTargetQueue(DispatchQueue queue) {
         throw new UnsupportedOperationException();
     }
+    public DispatchQueue getTargetQueue() {
+        throw new UnsupportedOperationException();
+    }
+    
     
     public DispatchQueuePriority getPriority() {
         return priority;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec  4 15:32:12 2009
@@ -32,9 +32,9 @@
 public class DispatchSystemTest {
     
     public static void main(String[] args) throws Exception {
-//        DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
-//        benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
-//        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
+        DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
+        benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
+        benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
 
         DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
         benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Fri Dec  4 15:32:12 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.CountDownLatch;
+
+
+import static java.lang.String.*;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatcherPoolTest {
+    
+    public static void main(String[] args) throws Exception {
+        DispatcherPool pooledDispatcher = new DispatcherPool("default", Runtime.getRuntime().availableProcessors(), 3);
+        pooledDispatcher.start();
+        
+        // warm the JIT up..
+        benchmarkWork(pooledDispatcher, 100000);
+        
+        int iterations = 1000*1000*20;
+        long start = System.nanoTime();
+        benchmarkWork(pooledDispatcher, iterations);
+        long end = System.nanoTime();
+        
+        double durationMS = 1.0d*(end-start)/1000000d;
+        double rate = 1000d * iterations / durationMS;
+        
+        pooledDispatcher.shutdown();
+        System.out.println(format("duration: %,.3f ms, rate: %,.2f executions/sec", durationMS, rate));
+    }
+
+    private static void benchmarkWork(final DispatcherPool pooledDispatcher, int iterations) throws InterruptedException {
+        final CountDownLatch counter = new CountDownLatch(iterations);
+        for (int i = 0; i < 1000; i++) {
+            Work dispatchable = new Work(counter, pooledDispatcher);
+            dispatchable.context.requestDispatch();
+        }
+        counter.await();
+    }
+    
+    private static final class Work implements Dispatchable {
+        private final CountDownLatch counter;
+        private final DispatchContext context;
+
+        private Work(CountDownLatch counter, DispatcherPool pooledDispatcher) {
+            this.counter = counter;
+            this.context = pooledDispatcher.register(this , "test");
+        }
+
+        public boolean dispatch() {
+            counter.countDown();
+            if( counter.getCount()>0 ) {
+                context.requestDispatch();
+            }
+            return true;
+        }
+    }
+
+    
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Dec  4 15:32:12 2009
@@ -39,10 +39,10 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
 import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Fri Dec  4 15:32:12 2009
@@ -19,9 +19,9 @@
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 
 /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java Fri Dec  4 15:32:12 2009
@@ -2,8 +2,8 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Fri Dec  4 15:32:12 2009
@@ -13,9 +13,9 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
 import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -77,7 +77,7 @@
         }
 
         public void setDispatcher(Dispatcher dispatcher) {
-            readContext = dispatcher.register(this, name);
+            readContext = dispatcher.register((Dispatchable)this, name);
         }
 
         public void onReadReady(Pipe<Object> pipe) {