You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/06 17:28:36 UTC
svn commit: r887707 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/...
Author: chirino
Date: Sun Dec 6 16:28:35 2009
New Revision: 887707
URL: http://svn.apache.org/viewvc?rev=887707&view=rev
Log:
Dispatcher is now using the Retained interface to manage it's lifecycle.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Sun Dec 6 16:28:35 2009
@@ -138,7 +138,7 @@
throw new IllegalStateException("Can only start a broker that is in the "+State.CONFIGURATION +" state. Broker was "+state.get());
}
try {
- dispatcher.start();
+ dispatcher.retain();
synchronized(virtualHosts) {
for (VirtualHost virtualHost : virtualHosts.values()) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Sun Dec 6 16:28:35 2009
@@ -97,7 +97,7 @@
@Before
public void setUp() throws Exception {
dispatcher = createDispatcher();
- dispatcher.start();
+ dispatcher.retain();
if (tcp) {
sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Sun Dec 6 16:28:35 2009
@@ -64,7 +64,7 @@
protected void startServices() throws Exception {
dispatcher = createDispatcher();
- dispatcher.start();
+ dispatcher.retain();
database = new BrokerDatabase(createStore());
database.setDispatcher(dispatcher);
database.start();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Sun Dec 6 16:28:35 2009
@@ -22,9 +22,6 @@
public interface Dispatcher extends Retained {
- public void start();
- public void shutdown(Runnable onShutdown);
-
public DispatchQueue getGlobalQueue();
public DispatchQueue getGlobalQueue(DispatchPriority priority);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Retained.java Sun Dec 6 16:28:35 2009
@@ -24,6 +24,6 @@
public void retain();
public void release();
- public void setShutdownHandler(Runnable shutdownHandler);
+ public void addShutdownWatcher(Runnable shutdownWatcher);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/BaseRetained.java Sun Dec 6 16:28:35 2009
@@ -1,28 +1,49 @@
package org.apache.activemq.dispatch.internal;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
public class BaseRetained {
- final protected AtomicInteger reatinCounter = new AtomicInteger(1);
- final protected AtomicReference<Runnable> shutdownHandler = new AtomicReference<Runnable>();
+ final protected AtomicInteger reatinCounter = new AtomicInteger(0);
+ final protected ArrayList<Runnable> shutdownHandlers = new ArrayList<Runnable>();
- public void setShutdownHandler(Runnable finalizer) {
- this.shutdownHandler.set(finalizer);
+ public void addShutdownWatcher(Runnable shutdownHandler) {
+ synchronized(shutdownHandlers) {
+ shutdownHandlers.add(shutdownHandler);
+ }
}
public void retain() {
- int prev = reatinCounter.getAndIncrement();
- assert prev!=0;
+ if( reatinCounter.getAndIncrement() == 0 ) {
+ startup();
+ }
}
public void release() {
if( reatinCounter.decrementAndGet()==0 ) {
- Runnable value = shutdownHandler.getAndSet(null);
- if( value!=null ) {
- value.run();
- }
+ shutdown();
+ }
+ }
+
+ /**
+ * Subclasses should override if they want to do some startup processing.
+ */
+ protected void startup() {
+ }
+
+
+ /**
+ * Subclasses should override if they want to do clean up.
+ */
+ protected void shutdown() {
+ ArrayList<Runnable> copy;
+ synchronized(shutdownHandlers) {
+ copy = new ArrayList<Runnable>(shutdownHandlers);
+ shutdownHandlers.clear();
+ }
+ for (Runnable runnable : copy) {
+ runnable.run();
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java Sun Dec 6 16:28:35 2009
@@ -22,12 +22,6 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.dispatch.DispatchQueue;
-import org.apache.activemq.dispatch.Dispatcher;
-import org.apache.activemq.dispatch.DispatcherFactory;
-
-import static org.apache.activemq.dispatch.DispatcherFactory.*;
-
-import static org.apache.activemq.dispatch.DispatcherFactory.*;
/**
*
@@ -44,6 +38,7 @@
public SerialDispatchQueue(String label) {
this.label = label;
+ retain();
}
public String getLabel() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Sun Dec 6 16:28:35 2009
@@ -34,7 +34,7 @@
import static org.apache.activemq.dispatch.DispatchPriority.*;
-public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
+final public class AdvancedDispatcher extends BaseRetained implements Dispatcher {
public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
@@ -44,10 +44,6 @@
private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
- final AtomicInteger startCounter = new AtomicInteger();
-// final AtomicBoolean started = new AtomicBoolean();
-// final AtomicBoolean shutdown = new AtomicBoolean();
-
private int roundRobinCounter = 0;
private int size;
private final int numPriorities;
@@ -67,54 +63,32 @@
}
/**
- * Subclasses should implement this to return a new dispatcher.
- *
- * @param name
- * The name to assign the dispatcher.
- * @param pool
- * The pool.
- * @return The new dispathcer.
- */
- protected DispatcherThread createDispatcher(String name) throws Exception {
- return new DispatcherThread(this, name, numPriorities);
- }
-
- /**
* @see org.apache.activemq.dispatch.internal.advanced.DispatcherThread#start()
*/
- public synchronized final void start() {
- if( startCounter.getAndIncrement()==0 ) {
- // Create all the workers.
- try {
- loadBalancer.start();
- for (int i = 0; i < size; i++) {
- DispatcherThread dispatacher = createDispatcher("dispatcher -" + (i + 1));
- dispatchers.add(dispatacher);
- dispatacher.start();
- }
- } catch (Exception e) {
- shutdown();
- }
+ protected void startup() {
+ loadBalancer.start();
+ for (int i = 0; i < size; i++) {
+ DispatcherThread dispatacher = new DispatcherThread(this, ("dispatcher -" + (i + 1)), numPriorities);
+ dispatchers.add(dispatacher);
+ dispatacher.start();
}
}
-
- public final void shutdown() {
- shutdown(null);
- }
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
- */
- public final void shutdown(Runnable onShutdown) {
- if( startCounter.decrementAndGet()==0 ) {
- final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
- for (DispatcherThread d : new ArrayList<DispatcherThread>(dispatchers)) {
- d.shutdown(shutdownCountDown, onShutdown);
+ protected void shutdown() {
+ Runnable countDown = new Runnable() {
+ AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
+ public void run() {
+ if( shutdownCountDown.decrementAndGet()==0 ) {
+ // Notify any registered shutdown watchers.
+ AdvancedDispatcher.super.shutdown();
+ }
}
- loadBalancer.stop();
+ };
+
+ for (DispatcherThread d : new ArrayList<DispatcherThread>(dispatchers)) {
+ d.shutdown(countDown);
}
+ loadBalancer.stop();
}
/**
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Sun Dec 6 16:28:35 2009
@@ -22,7 +22,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.util.Mapper;
@@ -30,8 +29,6 @@
import org.apache.activemq.util.TimerHeap;
import org.apache.activemq.util.list.LinkedNodeList;
-import static org.apache.activemq.dispatch.DispatcherFactory.*;
-
public class DispatcherThread implements Runnable {
static public final ThreadLocal<DispatcherThread> CURRENT = new ThreadLocal<DispatcherThread>();
@@ -124,27 +121,14 @@
thread.start();
}
}
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
- */
- public void shutdown() throws InterruptedException {
- Thread joinThread = shutdown(new AtomicInteger(1), null);
- if (joinThread != null) {
- // thread.interrupt();
- joinThread.join();
- }
- }
- public Thread shutdown(final AtomicInteger shutdownCountDown, final Runnable onShutdown) {
+ public Thread shutdown(final Runnable onShutdown) {
synchronized (this) {
if (thread != null) {
dispatchInternal(new Runnable() {
public void run() {
running = false;
- if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+ if( onShutdown!=null ) {
onShutdown.run();
}
}
@@ -153,7 +137,7 @@
thread = null;
return rc;
} else {
- if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+ if( onShutdown!=null) {
onShutdown.run();
}
}
@@ -268,7 +252,7 @@
foreignPermits.release();
}
- protected final void onForeignUpdate(DispatchContext context) {
+ protected void onForeignUpdate(DispatchContext context) {
synchronized (foreignQueue) {
ForeignEvent fe = context.updateEvent[foreignToggle];
@@ -281,7 +265,7 @@
}
}
- protected final boolean removeDispatchContext(DispatchContext context) {
+ protected boolean removeDispatchContext(DispatchContext context) {
synchronized (foreignQueue) {
if (context.updateEvent[0].isLinked()) {
@@ -304,7 +288,7 @@
return false;
}
- protected final boolean takeOwnership(DispatchContext context) {
+ protected boolean takeOwnership(DispatchContext context) {
synchronized (this) {
if (running) {
contexts.add(context);
@@ -316,13 +300,13 @@
}
//Special dispatch method that allow high priority dispatch:
- private final void dispatchInternal(Runnable runnable, int priority) {
+ private void dispatchInternal(Runnable runnable, int priority) {
DispatchContext context = new DispatchContext(this, runnable, false, name);
context.priority = priority;
context.requestDispatch();
}
- public final void dispatch(Runnable runnable, int priority) {
+ public void dispatch(Runnable runnable, int priority) {
DispatchContext context = new DispatchContext(this, runnable, false, name);
context.updatePriority(priority);
context.requestDispatch();
@@ -343,11 +327,11 @@
};
}
- public void execute(final Runnable runnable) {
+ public void execute(Runnable runnable) {
dispatch(runnable, 0);
}
- public void execute(final Runnable runnable, int prio) {
+ public void execute(Runnable runnable, int prio) {
dispatch(runnable, prio);
}
@@ -368,7 +352,7 @@
}
}
- final void add(ForeignEvent event) {
+ void add(ForeignEvent event) {
synchronized (foreignQueue) {
if (!event.isLinked()) {
foreignQueue[foreignToggle].addLast(event);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Sun Dec 6 16:28:35 2009
@@ -78,7 +78,7 @@
throw new UnsupportedOperationException();
}
- public void setShutdownHandler(Runnable finalizer) {
+ public void addShutdownWatcher(Runnable finalizer) {
throw new UnsupportedOperationException();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Sun Dec 6 16:28:35 2009
@@ -78,7 +78,7 @@
throw new UnsupportedOperationException();
}
- public void setShutdownHandler(Runnable finalizer) {
+ public void addShutdownWatcher(Runnable finalizer) {
throw new UnsupportedOperationException();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Sun Dec 6 16:28:35 2009
@@ -85,7 +85,7 @@
throw new UnsupportedOperationException();
}
- public void setShutdownHandler(Runnable finalizer) {
+ public void addShutdownWatcher(Runnable finalizer) {
throw new UnsupportedOperationException();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Sun Dec 6 16:28:35 2009
@@ -38,7 +38,7 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class SimpleDispatcher extends BaseRetained implements Dispatcher {
+final public class SimpleDispatcher extends BaseRetained implements Dispatcher {
public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
@@ -49,7 +49,6 @@
final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new ConcurrentLinkedQueue<DispatcherThread>();
final AtomicInteger waitingDispatcherCount = new AtomicInteger();
- final AtomicInteger startCounter = new AtomicInteger();
private final String label;
TimerThread timerThread;
@@ -109,35 +108,32 @@
}
}
- public void start() {
- if( startCounter.getAndIncrement()==0 ) {
- for (int i = 0; i < dispatchers.length; i++) {
- dispatchers[i] = new DispatcherThread(this, i);
- dispatchers[i].start();
- }
- timerThread = new TimerThread(this);
- timerThread.start();
+ protected void startup() {
+ for (int i = 0; i < dispatchers.length; i++) {
+ dispatchers[i] = new DispatcherThread(this, i);
+ dispatchers[i].start();
}
+ timerThread = new TimerThread(this);
+ timerThread.start();
}
- public void shutdown(final Runnable onShutdown) {
- if( startCounter.decrementAndGet()==0 ) {
-
- final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length+1);
- Runnable wrapper = new Runnable() {
- public void run() {
- if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
- onShutdown.run();
- }
- throw new DispatcherThread.Shutdown();
+ public void shutdown() {
+
+ Runnable countDown = new Runnable() {
+ AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length+1);
+ public void run() {
+ if( shutdownCountDown.decrementAndGet()==0 ) {
+ // Notify any registered shutdown watchers.
+ SimpleDispatcher.super.shutdown();
}
- };
-
- timerThread.shutdown(wrapper);
- for (int i = 0; i < dispatchers.length; i++) {
- ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
- queue.runnables.add(wrapper);
+ throw new DispatcherThread.Shutdown();
}
+ };
+
+ timerThread.shutdown(countDown);
+ for (int i = 0; i < dispatchers.length; i++) {
+ ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
+ queue.runnables.add(countDown);
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Sun Dec 6 16:28:35 2009
@@ -112,7 +112,7 @@
throw new UnsupportedOperationException();
}
- public void setShutdownHandler(Runnable finalizer) {
+ public void addShutdownWatcher(Runnable finalizer) {
throw new UnsupportedOperationException();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Sun Dec 6 16:28:35 2009
@@ -46,7 +46,7 @@
public void testActorInvocation() throws Exception
{
Dispatcher advancedSystem = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
- advancedSystem.start();
+ advancedSystem.retain();
DispatchQueue queue = advancedSystem.createSerialQueue("test");
ActorTestObject testObject = Actor.create(new ActorTestObject(), queue);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Sun Dec 6 16:28:35 2009
@@ -34,22 +34,24 @@
public static void main(String[] args) throws Exception {
Dispatcher advancedSystem = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
- advancedSystem.start();
+ advancedSystem.retain();
benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test"));
RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
- advancedSystem.shutdown(latch);
+ advancedSystem.addShutdownWatcher(latch);
+ advancedSystem.release();
latch.await();
Dispatcher simpleSystem = new SimpleDispatcher("test", Runtime.getRuntime().availableProcessors());
- simpleSystem.start();
+ simpleSystem.retain();
benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test"));
latch = new RunnableCountDownLatch(1);
- simpleSystem.shutdown(latch);
+ advancedSystem.addShutdownWatcher(latch);
+ advancedSystem.release();
latch.await();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Sun Dec 6 16:28:35 2009
@@ -30,21 +30,21 @@
public class DispatcherPoolTest {
public static void main(String[] args) throws Exception {
- AdvancedDispatcher pooledDispatcher = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
- pooledDispatcher.start();
+ AdvancedDispatcher dispatcher = new AdvancedDispatcher(Runtime.getRuntime().availableProcessors(), 3);
+ dispatcher.retain();
// warm the JIT up..
- benchmarkWork(pooledDispatcher, 100000);
+ benchmarkWork(dispatcher, 100000);
int iterations = 1000*1000*20;
long start = System.nanoTime();
- benchmarkWork(pooledDispatcher, iterations);
+ benchmarkWork(dispatcher, iterations);
long end = System.nanoTime();
double durationMS = 1.0d*(end-start)/1000000d;
double rate = 1000d * iterations / durationMS;
- pooledDispatcher.shutdown();
+ dispatcher.release();
System.out.println(format("duration: %,.3f ms, rate: %,.2f executions/sec", durationMS, rate));
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Sun Dec 6 16:28:35 2009
@@ -90,7 +90,7 @@
protected void startServices() throws Exception {
dispatcher = createDispatcher();
- dispatcher.start();
+ dispatcher.retain();
database = new BrokerDatabase(createStore());
database.setDispatcher(dispatcher);
if( TEST_MAX_STORE_LATENCY ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Sun Dec 6 16:28:35 2009
@@ -88,7 +88,7 @@
}
stop();
- dispatchQueue.setShutdownHandler(onShutdown);
+ dispatchQueue.addShutdownWatcher(onShutdown);
dispatchQueue.release();
dispatchQueue = null;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java Sun Dec 6 16:28:35 2009
@@ -210,7 +210,7 @@
broker.setName("Broker");
broker.createDispatcher();
try {
- broker.getDispatcher().start();
+ broker.getDispatcher().retain();
broker.startServices();
} catch (Exception e) {
// TODO Auto-generated catch block
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java Sun Dec 6 16:28:35 2009
@@ -255,7 +255,7 @@
private void createConnections(int destCount) throws Exception {
dispatcher = createDispatcher("BrokerDispatcher");
- dispatcher.start();
+ dispatcher.retain();
if (multibroker) {
sendBroker = createBroker("SendBroker", sendBrokerURI);
@@ -287,7 +287,7 @@
Dispatcher clientDispatcher = null;
if (SEPARATE_CLIENT_DISPATCHER) {
clientDispatcher = createDispatcher("ClientDispatcher");
- clientDispatcher.start();
+ clientDispatcher.retain();
} else {
clientDispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java Sun Dec 6 16:28:35 2009
@@ -219,7 +219,7 @@
}
public void runTest() throws Exception {
- getDispatcher().start();
+ getDispatcher().retain();
// Start 'em up.
startServices();
@@ -304,7 +304,7 @@
System.out.println(IntrospectionSupport.toString(test));
try
{
- test.getDispatcher().start();
+ test.getDispatcher().retain();
test.createConnections();
test.runTest();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887707&r1=887706&r2=887707&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Sun Dec 6 16:28:35 2009
@@ -71,7 +71,7 @@
pipe.write(EOF_TOKEN);
if (dispatchQueue != null) {
RunnableCountDownLatch done = new RunnableCountDownLatch(1);
- dispatchQueue.setShutdownHandler(done);
+ dispatchQueue.addShutdownWatcher(done);
dispatchQueue.release();
done.await();
} else {