You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 20:57:52 UTC

svn commit: r887337 - in /activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch: ./ internal/advanced/

Author: chirino
Date: Fri Dec  4 19:57:51 2009
New Revision: 887337

URL: http://svn.apache.org/viewvc?rev=887337&view=rev
Log:
removed the Dispatcher interface since it was only implmented by one class.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java
      - copied, changed from r887334, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
    activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java?rev=887337&r1=887336&r2=887337&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java Fri Dec  4 19:57:51 2009
@@ -18,7 +18,7 @@
 package org.apache.activemq.dispatch;
 
 import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 
 public interface DispatchObserver {
     
@@ -28,7 +28,7 @@
      * @param caller The calling dispatcher
      * @param context The context from which the dispatch is requested.
      */
-    public void onDispatch(Dispatcher caller, DispatchContext context);
+    public void onDispatch(DispatcherThread caller, DispatchContext context);
 
     /**
      * Must be called by the dispatcher when a {@link DispatchContext} is closed.

Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java (from r887334, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java&r1=887334&r2=887337&rev=887337&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/LoadBalancer.java Fri Dec  4 19:57:51 2009
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.dispatch.internal.advanced;
+package org.apache.activemq.dispatch;
 
-import org.apache.activemq.dispatch.DispatchObserver;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
 
 
 
@@ -26,13 +27,13 @@
      * Must be called by a dispatch thread when it starts
      * @param dispatcher The dispatcher
      */
-    public void onDispatcherStarted(Dispatcher dispatcher);
+    public void onDispatcherStarted(DispatcherThread dispatcher);
 
     /**
      * Must be called by a dispatch thread when it stops
      * @param dispatcher The dispatcher
      */
-    public void onDispatcherStopped(Dispatcher dispatcher);
+    public void onDispatcherStopped(DispatcherThread dispatcher);
 
     /**
      * Gets an {@link DispatchObserver} for the dispatch context. 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java?rev=887337&r1=887336&r2=887337&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java Fri Dec  4 19:57:51 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.dispatch.DispatchQueue;
 import org.apache.activemq.dispatch.DispatchSPI;
 import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.LoadBalancer;
 import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
 import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
 
@@ -78,7 +79,7 @@
     }
 
     /**
-     * @see org.apache.activemq.dispatch.internal.advanced.Dispatcher#start()
+     * @see org.apache.activemq.dispatch.internal.advanced.DispatcherThread#start()
      */
     public synchronized final void start()  {
         if( startCounter.getAndIncrement()==0 ) {
@@ -129,7 +130,7 @@
      * 
      * @return The currently executing dispatcher
      */
-    public Dispatcher getCurrentDispatcher() {
+    public DispatcherThread getCurrentDispatcher() {
         return dispatcher.get();
     }
 
@@ -149,7 +150,7 @@
     /**
      * A Dispatcher must call this when exiting it's dispatch loop
      */
-    public void onDispatcherStopped(Dispatcher d) {
+    public void onDispatcherStopped(DispatcherThread d) {
         synchronized (dispatchers) {
             if (dispatchers.remove(d)) {
                 size--;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java?rev=887337&r1=887336&r2=887337&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java Fri Dec  4 19:57:51 2009
@@ -49,7 +49,7 @@
      * Called to transfer a {@link PooledDispatchContext} to a new
      * Dispatcher.
      */
-    public void setTargetQueue(Dispatcher newDispatcher);
+    public void setTargetQueue(DispatcherThread newDispatcher);
 
     /**
      * Gets the dispatcher to which this PooledDispatchContext currently
@@ -57,7 +57,7 @@
      * 
      * @return
      */
-    public Dispatcher getTargetQueue();
+    public DispatcherThread getTargetQueue();
 
     /**
      * Gets the execution tracker for the context.

Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887337&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec  4 19:57:51 2009
@@ -0,0 +1,697 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.dispatch.DispatchObserver;
+import org.apache.activemq.dispatch.DispatchSystem;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.PriorityLinkedList;
+import org.apache.activemq.util.TimerHeap;
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+
+public class DispatcherThread implements Runnable {
+
+    private final ThreadDispatchQueue dispatchQueues[];
+    
+    private static final boolean DEBUG = false;
+    private Thread thread;
+    protected boolean running = false;
+    private boolean threaded = false;
+    protected final int MAX_USER_PRIORITY;
+    protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
+
+    // Set if this dispatcher is part of a dispatch pool:
+    protected final AdvancedDispatchSPI spi;
+
+    // The local dispatch queue:
+    protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+    // Dispatch queue for requests from other threads:
+    private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+    private static final int[] TOGGLE = new int[] { 1, 0 };
+    private int foreignToggle = 0;
+
+    // Timed Execution List
+    protected final TimerHeap<Runnable> timerHeap = new TimerHeap<Runnable>() {
+        @Override
+        protected final void execute(Runnable ready) {
+            ready.run();
+        }
+    };
+
+    protected final String name;
+    private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+    private final Semaphore foreignPermits = new Semaphore(0);
+
+    private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+        public Integer map(PriorityDispatchContext element) {
+            return element.listPrio;
+        }
+    };
+
+    protected DispatcherThread(AdvancedDispatchSPI spi, String name, int priorities) {
+        this.name = name;
+        
+        this.dispatchQueues = new ThreadDispatchQueue[3];
+        for (int i = 0; i < 3; i++) {
+            dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchQueuePriority.values()[i]);
+        }
+
+        MAX_USER_PRIORITY = priorities - 1;
+        priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+        foreignQueue = createForeignEventQueue();
+        for (int i = 0; i < 2; i++) {
+            foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
+        }
+        this.spi = spi;
+    }
+
+    @SuppressWarnings("unchecked")
+    private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+        return new LinkedNodeList[2];
+    }
+
+    protected abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+        public abstract void execute();
+
+        final void addToList() {
+            synchronized (foreignQueue) {
+                if (!this.isLinked()) {
+                    foreignQueue[foreignToggle].addLast(this);
+                    if (!foreignAvailable.getAndSet(true)) {
+                        wakeup();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean isThreaded() {
+        return threaded;
+    }
+
+    public void setThreaded(boolean threaded) {
+        this.threaded = threaded;
+    }
+
+    public int getDispatchPriorities() {
+        return MAX_USER_PRIORITY;
+    }
+
+    private class UpdateEvent extends ForeignEvent {
+        private final PriorityDispatchContext pdc;
+
+        UpdateEvent(PriorityDispatchContext pdc) {
+            this.pdc = pdc;
+        }
+
+        // Can only be called by the owner of this dispatch context:
+        public void execute() {
+            pdc.processForeignUpdates();
+        }
+    }
+
+    public DispatchContext register(Runnable runnable, String name) {
+        return new PriorityDispatchContext(runnable, true, name);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#start()
+     */
+    public synchronized final void start() {
+        if (thread == null) {
+            running = true;
+            thread = new Thread(this, name);
+            thread.start();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+     */
+    public void shutdown() throws InterruptedException {
+        Thread joinThread = shutdown(new AtomicInteger(1), null);
+        if (joinThread != null) {
+            // thread.interrupt();
+            joinThread.join();
+        }
+    }
+    
+    public Thread shutdown(final AtomicInteger shutdownCountDown, final Runnable onShutdown) {
+        synchronized (this) {
+            if (thread != null) {
+                dispatchInternal(new Runnable() {
+                    public void run() {
+                        running = false;
+                        if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                            onShutdown.run();
+                        }
+                    }
+                }, MAX_USER_PRIORITY + 1);
+                Thread rc = thread;
+                thread = null;
+                return rc;
+            } else {
+                if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+                    onShutdown.run();
+                }
+            }
+            return null;
+        }
+    }
+
+    protected void cleanup() {
+        ArrayList<PriorityDispatchContext> toClose = null;
+        synchronized (this) {
+            running = false;
+            toClose = new ArrayList<PriorityDispatchContext>(contexts.size());
+            toClose.addAll(contexts);
+        }
+
+        for (PriorityDispatchContext context : toClose) {
+            context.close(false);
+        }
+    }
+
+    public void run() {
+
+        if (spi != null) {
+            // Inform the dispatcher that we have started:
+            spi.onDispatcherStarted((DispatcherThread) this);
+        }
+
+        PriorityDispatchContext pdc;
+        try {
+            while (running) {
+                int counter = 0;
+                // If no local work available wait for foreign work:
+                while((pdc = priorityQueue.poll())!=null){
+                    if( pdc.priority < dispatchQueues.length ) {
+                        DispatchSystem.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
+                    }
+                    
+                    if (pdc.tracker != null) {
+                        spi.setCurrentDispatchContext(pdc);
+                    }
+
+                    counter++;
+                    pdc.run();
+
+                    if (pdc.tracker != null) {
+                        spi.setCurrentDispatchContext(null);
+                    }
+                }
+
+                if( counter==0 ) {
+                    waitForEvents();
+                }
+
+                // Execute delayed events:
+                timerHeap.executeReadyTimers();
+
+                // Allow subclasses to do additional work:
+                dispatchHook();
+
+                // Check for foreign dispatch requests:
+                if (foreignAvailable.get()) {
+                    LinkedNodeList<ForeignEvent> foreign;
+                    synchronized (foreignQueue) {
+                        // Swap foreign queues and drain permits;
+                        foreign = foreignQueue[foreignToggle];
+                        foreignToggle = TOGGLE[foreignToggle];
+                        foreignAvailable.set(false);
+                        foreignPermits.drainPermits();
+                    }
+                    while (true) {
+                        ForeignEvent fe = foreign.getHead();
+                        if (fe == null) {
+                            break;
+                        }
+
+                        fe.unlink();
+                        fe.execute();
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            return;
+        } catch (Throwable thrown) {
+            thrown.printStackTrace();
+        } finally {
+            if (spi != null) {
+                spi.onDispatcherStopped((DispatcherThread) this);
+            }
+            cleanup();
+        }
+    }
+
+    /**
+     * Subclasses may override this to do do additional dispatch work:
+     */
+    protected void dispatchHook() throws Exception {
+
+    }
+
+    /**
+     * Subclasses may override this to implement another mechanism for wakeup.
+     * 
+     * @throws Exception
+     */
+    protected void waitForEvents() throws Exception {
+        long next = timerHeap.timeToNext(TimeUnit.NANOSECONDS);
+        if (next == -1) {
+            foreignPermits.acquire();
+        } else if (next > 0) {
+            foreignPermits.tryAcquire(next, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    /**
+     * Subclasses may override this to provide an alternative wakeup mechanism.
+     */
+    protected void wakeup() {
+        foreignPermits.release();
+    }
+
+    protected final void onForeignUpdate(PriorityDispatchContext context) {
+        synchronized (foreignQueue) {
+
+            ForeignEvent fe = context.updateEvent[foreignToggle];
+            if (!fe.isLinked()) {
+                foreignQueue[foreignToggle].addLast(fe);
+                if (!foreignAvailable.getAndSet(true)) {
+                    wakeup();
+                }
+            }
+        }
+    }
+
+    protected final boolean removeDispatchContext(PriorityDispatchContext context) {
+        synchronized (foreignQueue) {
+
+            if (context.updateEvent[0].isLinked()) {
+                context.updateEvent[0].unlink();
+            }
+            if (context.updateEvent[1].isLinked()) {
+                context.updateEvent[1].unlink();
+            }
+        }
+
+        if (context.isLinked()) {
+            context.unlink();
+            return true;
+        }
+
+        synchronized (this) {
+            contexts.remove(context);
+        }
+
+        return false;
+    }
+
+    protected final boolean takeOwnership(PriorityDispatchContext context) {
+        synchronized (this) {
+            if (running) {
+                contexts.add(context);
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    //Special dispatch method that allow high priority dispatch:
+    private final void dispatchInternal(Runnable runnable, int priority) {
+        PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
+        context.priority = priority;
+        context.requestDispatch();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+     * .dispatch.Dispatcher.Dispatchable)
+     */
+    public final void dispatch(Runnable runnable, int priority) {
+        PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
+        context.updatePriority(priority);
+        context.requestDispatch();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+     */
+    public Executor createPriorityExecutor(final int priority) {
+
+        return new Executor() {
+
+            public void execute(final Runnable runnable) {
+                dispatch(runnable, priority);
+            }
+        };
+    }
+
+    public void execute(final Runnable runnable) {
+        dispatch(runnable, 0);
+    }
+    
+    public void execute(final Runnable runnable, int prio) {
+        dispatch(runnable, prio);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+     * long, java.util.concurrent.TimeUnit)
+     */
+    public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+        schedule(runnable, 0, delay, timeUnit);
+    }
+    
+    public void schedule(final Runnable runnable, final int prio, final long delay, final TimeUnit timeUnit) {
+        final Runnable wrapper = new Runnable() {
+            public void run() {
+                execute(runnable, prio);
+            }
+        };
+        if (getCurrentDispatcher() == this) {
+            timerHeap.addRelative(wrapper, delay, timeUnit);
+        } else {
+            new ForeignEvent() {
+                public void execute() {
+                    timerHeap.addRelative(wrapper, delay, timeUnit);
+                }
+            }.addToList();
+        }
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    private final DispatcherThread getCurrentDispatcher() {
+        if (spi != null) {
+            return (DispatcherThread) spi.getCurrentDispatcher();
+        } else if (Thread.currentThread() == thread) {
+            return (DispatcherThread) this;
+        } else {
+            return null;
+        }
+
+    }
+
+    private final DispatchContext getCurrentDispatchContext() {
+        return spi.getCurrentDispatchContext();
+    }
+
+    /**
+     * 
+     */
+    protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements DispatchContext {
+        // The target:
+        private final Runnable runnable;
+        // The name of this context:
+        final String name;
+        // list prio can only be updated in the thread of of the owning
+        // dispatcher
+        protected int listPrio;
+
+        // The update events are used to update fields in the dispatch context
+        // from foreign threads:
+        final UpdateEvent updateEvent[];
+
+        private final DispatchObserver tracker;
+        protected DispatcherThread currentOwner;
+        private DispatcherThread updateDispatcher = null;
+
+        private int priority;
+        private boolean dispatchRequested = false;
+        private boolean closed = false;
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+
+        protected PriorityDispatchContext(Runnable runnable, boolean persistent, String name) {
+            this.runnable = runnable;
+            this.name = name;
+            this.currentOwner = (DispatcherThread) DispatcherThread.this;
+            if (persistent && spi != null) {
+                this.tracker = spi.getLoadBalancer().createExecutionTracker((DispatchContext) this);
+            } else {
+                this.tracker = null;
+            }
+            updateEvent = createUpdateEvent();
+            updateEvent[0] = new UpdateEvent(this);
+            updateEvent[1] = new UpdateEvent(this);
+            if (persistent) {
+                currentOwner.takeOwnership(this);
+            }
+        }
+
+        private final DispatcherThread.UpdateEvent[] createUpdateEvent() {
+            return new DispatcherThread.UpdateEvent[2];
+        }
+
+        /**
+         * Gets the execution tracker for the context.
+         * 
+         * @return the execution tracker for the context:
+         */
+        public DispatchObserver getExecutionTracker() {
+            return tracker;
+        }
+
+        /**
+         * This can only be called by the owning dispatch thread:
+         * 
+         * @return False if the dispatchable has more work to do.
+         */
+        public final void run() {
+            runnable.run();
+        }
+
+        public final void setTargetQueue(DispatcherThread newDispatcher) {
+            synchronized (this) {
+
+                // If we're already set to this dispatcher
+                if (newDispatcher == currentOwner) {
+                    if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+                        return;
+                    }
+                }
+
+                updateDispatcher = (DispatcherThread) newDispatcher;
+                if (DEBUG)
+                    System.out.println(getName() + " updating to " + updateDispatcher);
+
+                currentOwner.onForeignUpdate(this);
+            }
+
+        }
+
+        public void requestDispatch() {
+
+            DispatcherThread callingDispatcher = getCurrentDispatcher();
+            if (tracker != null)
+                tracker.onDispatch(callingDispatcher, getCurrentDispatchContext());
+
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize
+            // to protect against ownership changes:
+            synchronized (this) {
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+
+                    if (!currentOwner.running) {
+                        // TODO In the event that the current dispatcher
+                        // failed due to a runtime exception, we could
+                        // try to switch to a new dispatcher.
+                        throw new RejectedExecutionException();
+                    }
+                    if (!isLinked()) {
+                        currentOwner.priorityQueue.add(this, listPrio);
+                    }
+                    return;
+                }
+
+                dispatchRequested = true;
+                currentOwner.onForeignUpdate(this);
+            }
+        }
+
+        public void updatePriority(int priority) {
+
+            if (closed) {
+                return;
+            }
+
+            priority = Math.min(priority, MAX_USER_PRIORITY);
+
+            if (this.priority == priority) {
+                return;
+            }
+            DispatcherThread callingDispatcher = getCurrentDispatcher();
+
+            // Otherwise this is coming off another thread, so we need to
+            // synchronize to protect against ownership changes:
+            synchronized (this) {
+                if (closed) {
+                    return;
+                }
+                this.priority = priority;
+
+                // If this is called by the owning dispatcher, then we go ahead
+                // and update:
+                if (currentOwner == callingDispatcher) {
+
+                    if (priority != listPrio) {
+
+                        listPrio = priority;
+                        // If there is a priority change relink the context
+                        // at the new priority:
+                        if (isLinked()) {
+                            unlink();
+                            currentOwner.priorityQueue.add(this, listPrio);
+                        }
+                    }
+                    return;
+                }
+
+                currentOwner.onForeignUpdate(this);
+            }
+
+        }
+
+        public void processForeignUpdates() {
+            synchronized (this) {
+
+                if (closed) {
+                    close(false);
+                    return;
+                }
+
+                if (updateDispatcher != null && updateDispatcher.takeOwnership(this)) {
+                    if (DEBUG) {
+                        System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+                    }
+
+                    if (currentOwner.removeDispatchContext(this)) {
+                        dispatchRequested = true;
+                    }
+
+                    updateDispatcher.onForeignUpdate(this);
+                    switchedDispatcher(currentOwner, updateDispatcher);
+                    currentOwner = updateDispatcher;
+                    updateDispatcher = null;
+
+                } else {
+                    updatePriority(priority);
+
+                    if (dispatchRequested) {
+                        dispatchRequested = false;
+                        requestDispatch();
+                    }
+                }
+            }
+        }
+
+        /**
+         * May be overriden by subclass to additional work on dispatcher switch
+         * 
+         * @param oldDispatcher The old dispatcher
+         * @param newDispatcher The new Dispatcher
+         */
+        protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
+
+        }
+
+        public boolean isClosed() {
+            return closed;
+        }
+
+        public void close(boolean sync) {
+            DispatcherThread callingDispatcher = getCurrentDispatcher();
+            // System.out.println(this + "Closing");
+            synchronized (this) {
+                closed = true;
+                // If the owner of this context is the calling thread, then
+                // delegate to the dispatcher.
+                if (currentOwner == callingDispatcher) {
+                    removeDispatchContext(this);
+                    closeLatch.countDown();
+                    return;
+                }
+            }
+
+            currentOwner.onForeignUpdate(this);
+            if (sync) {
+                boolean interrupted = false;
+                while (true) {
+                    try {
+                        closeLatch.await();
+                        break;
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+                    }
+                }
+
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        public final String toString() {
+            return getName();
+        }
+
+        public DispatcherThread getTargetQueue() {
+            return currentOwner;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+    }
+
+    public String getName() {
+        return name;
+    }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?rev=887337&r1=887336&r2=887337&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Fri Dec  4 19:57:51 2009
@@ -23,6 +23,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.dispatch.DispatchObserver;
+import org.apache.activemq.dispatch.LoadBalancer;
 
 
 public class SimpleLoadBalancer implements LoadBalancer {
@@ -32,7 +33,7 @@
     //TODO: Added plumbing for periodic rebalancing which we should
     //consider implementing
     private static final boolean ENABLE_UPDATES = false;
-    private final ArrayList<Dispatcher> dispatchers = new ArrayList<Dispatcher>();
+    private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
 
     private AtomicBoolean running = new AtomicBoolean(false);
     private boolean needsUpdate = false;
@@ -87,7 +88,7 @@
         running.compareAndSet(true, false);
     }
 
-    public synchronized final void onDispatcherStarted(Dispatcher dispatcher) {
+    public synchronized final void onDispatcherStarted(DispatcherThread dispatcher) {
         dispatchers.add(dispatcher);
         scheduleNext();
     }
@@ -95,7 +96,7 @@
     /**
      * A Dispatcher must call this when exiting it's dispatch loop
      */
-    public void onDispatcherStopped(Dispatcher dispatcher) {
+    public void onDispatcherStopped(DispatcherThread dispatcher) {
         dispatchers.remove(dispatcher);
     }
 
@@ -124,7 +125,7 @@
         private final AtomicInteger work = new AtomicInteger(0);
 
         private DispatchContext singleSource;
-        private Dispatcher currentOwner;
+        private DispatcherThread currentOwner;
 
         SimpleExecutionTracker(DispatchContext context) {
             this.context = context;
@@ -145,7 +146,7 @@
          * @return True if this method resulted in the dispatch request being
          *         assigned to another dispatcher.
          */
-        public void onDispatch(Dispatcher callingDispatcher, DispatchContext callingContext) {
+        public void onDispatch(DispatcherThread callingDispatcher, DispatchContext callingContext) {
 
             if (callingContext != null) {
                 // Make sure we are being called by another node: