You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 09:57:21 UTC
svn commit: r887117 [1/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/...
Author: chirino
Date: Fri Dec 4 08:56:50 2009
New Revision: 887117
URL: http://svn.apache.org/viewvc?rev=887117&view=rev
Log:
refacorting the advanced package a bit.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java
- copied, changed from r886965, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
- copied, changed from r886965, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/QueueSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Fri Dec 4 08:56:50 2009
@@ -25,7 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@@ -42,7 +42,7 @@
protected int inputResumeThreshold = 512 * 1024;
protected boolean useAsyncWriteThread = true;
- private IDispatcher dispatcher;
+ private Dispatcher dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
private ExecutorService blockingWriter;
private ExceptionListener exceptionListener;
@@ -170,11 +170,11 @@
this.priorityLevels = priorityLevels;
}
- public IDispatcher getDispatcher() {
+ public Dispatcher getDispatcher() {
return dispatcher;
}
- public void setDispatcher(IDispatcher dispatcher) {
+ public void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Fri Dec 4 08:56:50 2009
@@ -26,8 +26,8 @@
import org.apache.activemq.Service;
import org.apache.activemq.apollo.Connection;
import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportServer;
@@ -51,7 +51,7 @@
private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer, VirtualHost>();
private VirtualHost defaultVirtualHost;
- private IDispatcher dispatcher;
+ private Dispatcher dispatcher;
private File dataDirectory;
private final class BrokerAcceptListener implements TransportAcceptListener {
@@ -129,7 +129,7 @@
// apply some default configuration to this broker instance before it's started.
if( dispatcher == null ) {
int threads = Runtime.getRuntime().availableProcessors();
- dispatcher = PriorityDispatcher.createPriorityDispatchPool("Broker: "+getDefaultVirtualHost().getHostName(), Broker.MAX_PRIORITY, threads);
+ dispatcher = DispatcherThread.createPriorityDispatchPool("Broker: "+getDefaultVirtualHost().getHostName(), Broker.MAX_PRIORITY, threads);
}
@@ -376,10 +376,10 @@
// /////////////////////////////////////////////////////////////////
// Property Accessors
// /////////////////////////////////////////////////////////////////
- public IDispatcher getDispatcher() {
+ public Dispatcher getDispatcher() {
return dispatcher;
}
- public void setDispatcher(IDispatcher dispatcher) {
+ public void setDispatcher(Dispatcher dispatcher) {
assertInConfigurationState();
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Fri Dec 4 08:56:50 2009
@@ -41,7 +41,7 @@
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.Store.Session;
import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
@@ -71,7 +71,7 @@
private final FlowController<OperationBase<?>> storeController;
private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
- private IDispatcher dispatcher;
+ private Dispatcher dispatcher;
private Thread flushThread;
private AtomicBoolean running = new AtomicBoolean(false);
private DatabaseListener listener;
@@ -1288,11 +1288,11 @@
return store.allocateStoreTracking();
}
- public IDispatcher getDispatcher() {
+ public Dispatcher getDispatcher() {
return dispatcher;
}
- public void setDispatcher(IDispatcher dispatcher) {
+ public void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Fri Dec 4 08:56:50 2009
@@ -24,7 +24,7 @@
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PrioritySizeLimiter;
import org.apache.activemq.flow.SizeLimiter;
@@ -52,7 +52,7 @@
private static final boolean USE_PRIORITY_QUEUES = true;
private BrokerDatabase database;
- private IDispatcher dispatcher;
+ private Dispatcher dispatcher;
private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
@@ -226,7 +226,7 @@
this.database = database;
}
- public void setDispatcher(IDispatcher dispatcher) {
+ public void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Dec 4 08:56:50 2009
@@ -30,8 +30,8 @@
import org.apache.activemq.apollo.broker.Router;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.Period;
import org.apache.activemq.transport.TransportFactory;
@@ -87,7 +87,7 @@
protected Broker sendBroker;
protected Broker rcvBroker;
protected ArrayList<Broker> brokers = new ArrayList<Broker>();
- protected IDispatcher dispatcher;
+ protected Dispatcher dispatcher;
protected final AtomicLong msgIdGenerator = new AtomicLong();
protected final AtomicBoolean stopping = new AtomicBoolean();
@@ -134,8 +134,8 @@
protected abstract String getRemoteWireFormat();
- protected IDispatcher createDispatcher() {
- return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+ protected Dispatcher createDispatcher() {
+ return DispatcherThread.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
}
@Test
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Fri Dec 4 08:56:50 2009
@@ -6,8 +6,8 @@
import org.apache.activemq.apollo.Connection;
import org.apache.activemq.apollo.broker.Destination;
import org.apache.activemq.apollo.broker.MessageDelivery;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
import org.apache.activemq.flow.IFlowController;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISinkController;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Fri Dec 4 08:56:50 2009
@@ -27,8 +27,8 @@
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.IDispatcher;
-import org.apache.activemq.dispatch.internal.advanced.PriorityDispatcher;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.queue.IQueue;
/**
@@ -38,7 +38,7 @@
public class SharedQueueTest extends TestCase {
- IDispatcher dispatcher;
+ Dispatcher dispatcher;
BrokerDatabase database;
BrokerQueueStore queueStore;
private static final boolean USE_KAHA_DB = true;
@@ -47,8 +47,8 @@
protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
- protected IDispatcher createDispatcher() {
- return PriorityDispatcher.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+ protected Dispatcher createDispatcher() {
+ return DispatcherThread.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
}
protected int consumerStartDelay = 0;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec 4 08:56:50 2009
@@ -18,7 +18,7 @@
import java.nio.channels.SelectableChannel;
-import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSystem;
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
/**
*
@@ -26,38 +26,48 @@
*/
public class DispatchSystem {
- private static final SimpleDispatchSystem system = new SimpleDispatchSystem(Runtime.getRuntime().availableProcessors());
-
- static DispatchQueue getMainQueue() {
- return system.getMainQueue();
- }
-
public static enum DispatchQueuePriority {
HIGH,
DEFAULT,
LOW;
}
- static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
- return system.getGlobalQueue(priority);
+ static abstract public class DispatchSPI {
+ abstract public DispatchQueue getMainQueue();
+ abstract public DispatchQueue getGlobalQueue(DispatchQueuePriority priority);
+ abstract public DispatchQueue createQueue(String label);
+ abstract public void dispatchMain();
+ abstract public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
+ }
+
+ public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+ static public DispatchQueue getCurrentQueue() {
+ return CURRENT_QUEUE.get();
+ }
+
+ private final static DispatchSPI spi = cretateDispatchSystemSPI();
+ private static DispatchSPI cretateDispatchSystemSPI() {
+ return new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
}
- static DispatchQueue createQueue(String label) {
- return system.createQueue(label);
+ static DispatchQueue getMainQueue() {
+ return spi.getMainQueue();
}
- static DispatchQueue getCurrentQueue() {
- return system.getCurrentQueue();
+ static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+ return spi.getGlobalQueue(priority);
+ }
+
+ static DispatchQueue createQueue(String label) {
+ return spi.createQueue(label);
}
static void dispatchMain() {
- system.dispatchMain();
+ spi.dispatchMain();
}
static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
- return system.createSource(channel, interestOps, queue);
+ return spi.createSource(channel, interestOps, queue);
}
-
-
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractDispatchObject implements DispatchObject {
+
+ protected Object context;
+ protected Runnable finalizer;
+ protected DispatchQueue targetQueue;
+
+ @SuppressWarnings("unchecked")
+ public <Context> Context getContext() {
+ return (Context) context;
+ }
+
+ public <Context> void setContext(Context context) {
+ this.context = context;
+ }
+
+ public void setFinalizer(Runnable finalizer) {
+ this.finalizer = finalizer;
+ }
+
+ public void setTargetQueue(DispatchQueue targetQueue) {
+ this.targetQueue = targetQueue;
+ }
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/QueueSupport.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueSupport {
+
+ static public void dispatchApply(DispatchQueue queue, int itterations, final Runnable runnable) throws InterruptedException {
+ final CountDownLatch done = new CountDownLatch(itterations);
+ Runnable wrapper = new Runnable() {
+ public void run() {
+ try {
+ runnable.run();
+ } finally {
+ done.countDown();
+ }
+ }
+ };
+ for( int i=0; i < itterations; i++ ) {
+ queue.dispatchAsync(wrapper);
+ }
+ done.await();
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSystem;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
+
+ private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+ final private String label;
+ final private AtomicInteger suspendCounter = new AtomicInteger();
+ final private AtomicLong size = new AtomicLong();
+
+ static final ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+
+ public SerialDispatchQueue(String label) {
+ this.label = label;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void resume() {
+ suspendCounter.decrementAndGet();
+ }
+
+ public void suspend() {
+ suspendCounter.incrementAndGet();
+ }
+
+ public void dispatchAfter(long delayMS, Runnable runnable) {
+ throw new RuntimeException("TODO: implement me.");
+ }
+
+ public void dispatchAsync(Runnable runnable) {
+ if( runnable == null ) {
+ throw new IllegalArgumentException();
+ }
+ long lastSize = size.incrementAndGet();
+ runnables.add(runnable);
+ if( targetQueue!=null && lastSize == 1 && suspendCounter.get()<=0 ) {
+ targetQueue.dispatchAsync(this);
+ }
+ }
+
+ public void run() {
+ DispatchQueue original = DispatchSystem.CURRENT_QUEUE.get();
+ DispatchSystem.CURRENT_QUEUE.set(this);
+ try {
+ Runnable runnable;
+ long lsize = size.get();
+ while( suspendCounter.get() <= 0 && lsize > 0 ) {
+ try {
+ runnable = runnables.poll();
+ if( runnable!=null ) {
+ runnable.run();
+ lsize = size.decrementAndGet();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ } finally {
+ DispatchSystem.CURRENT_QUEUE.set(original);
+ }
+ }
+
+ public void dispatchSync(Runnable runnable) throws InterruptedException {
+ dispatchApply(1, runnable);
+ }
+
+ public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+ QueueSupport.dispatchApply(this, iterations, runnable);
+ }
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java (from r886965, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java&r1=886965&r2=887117&rev=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java Fri Dec 4 08:56:50 2009
@@ -22,6 +22,8 @@
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchSource;
import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
+import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
@@ -31,18 +33,16 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class AdancedDispatchSystem {
-
- static final ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
-
+public class AdancedDispatchSPI extends DispatchSPI {
+
final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
final GlobalDispatchQueue globalQueues[];
- PooledPriorityDispatcher pooledDispatcher;
+ DispatcherPool pooledDispatcher;
final AtomicLong globalQueuedRunnables = new AtomicLong();
- public AdancedDispatchSystem(int size) throws Exception {
- pooledDispatcher = new PooledPriorityDispatcher("default", size, 3);
+ public AdancedDispatchSPI(int size) throws Exception {
+ pooledDispatcher = new DispatcherPool("default", size, 3);
pooledDispatcher.start();
globalQueues = new GlobalDispatchQueue[3];
@@ -65,10 +65,6 @@
return rc;
}
- public DispatchQueue getCurrentQueue() {
- return CURRENT_QUEUE.get();
- }
-
public void dispatchMain() {
mainQueue.run();
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public interface Dispatcher extends Executor {
+
+ /**
+ * This interface is implemented by Dispatchable entities. A Dispatchable
+ * entity registers with an {@link Dispatcher} and is returned a
+ * {@link DispatchContext} which it can use to request the
+ * {@link Dispatcher} to invoke {@link Dispatchable#dispatch()}
+ *
+ * {@link Dispatcher} guarantees that {@link #dispatch()} will never invoke
+ * dispatch concurrently unless the {@link Dispatchable} is registered with
+ * more than one {@link Dispatcher};
+ */
+ public interface Dispatchable {
+ public boolean dispatch();
+ }
+
+ /**
+ * Returned to callers registered with this dispathcer. Used by the caller
+ * to inform the dispatcher that it is ready for dispatch.
+ *
+ * Note that DispatchContext is not safe for concurrent access by multiple
+ * threads.
+ */
+ public interface DispatchContext {
+ /**
+ * Once registered with a dispatcher, this can be called to request
+ * dispatch. The {@link Dispatchable} will remain in the dispatch queue
+ * until a subsequent call to {@link Dispatchable#dispatch()} returns
+ * false;
+ *
+ * @throws RejectedExecutionException If the dispatcher has been shutdown.
+ */
+ public void requestDispatch() throws RejectedExecutionException;
+
+ /**
+ * This can be called to update the dispatch priority.
+ *
+ * @param priority
+ */
+ public void updatePriority(int priority);
+
+ /**
+ * Gets the Dispatchable that this context represents.
+ *
+ * @return The dispatchable
+ */
+ public Dispatchable getDispatchable();
+
+ /**
+ * Gets the name of the dispatch context
+ *
+ * @return The dispatchable
+ */
+ public String getName();
+
+ /**
+ * This must be called to release any resource the dispatcher is holding
+ * on behalf of this context. Once called this {@link DispatchContext} should
+ * no longer be used.
+ */
+ public void close(boolean sync);
+ }
+
+ public class RunnableAdapter implements Dispatchable, Runnable {
+ private Runnable runnable;
+
+ public RunnableAdapter() {
+ runnable = this;
+ }
+ public RunnableAdapter(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public boolean dispatch() {
+ runnable.run();
+ return true;
+ }
+
+ public void run() {
+ }
+ }
+
+ /**
+ * Registers a {@link Dispatchable} with this dispatcher, and returns a
+ * {@link DispatchContext} that the caller can use to request dispatch.
+ *
+ * @param dispatchable
+ * The {@link Dispatchable}
+ * @param name
+ * An identifier for the dispatcher.
+ * @return A {@link DispatchContext} that can be used to request dispatch
+ */
+ public DispatchContext register(Dispatchable dispatchable, String name);
+
+ /**
+ * Gets the number of dispatch priorities. Dispatch priorities are 0 based,
+ * so if the number of dispatch priorities is 10, the maxium is 9.
+ * @return the number of dispatch priorities.
+ */
+ public int getDispatchPriorities();
+
+ /**
+ * Creates an executor that will execute its tasks at the specified
+ * priority.
+ *
+ * @param priority
+ * The priority
+ * @return A prioritized executor.
+ */
+ public Executor createPriorityExecutor(int priority);
+
+ /**
+ * Starts the dispatcher.
+ */
+ public void start() throws Exception;
+
+ /**
+ * Shuts down the dispatcher, this may result in previous dispatch requests
+ * going unserved.
+ */
+ public void shutdown() throws InterruptedException;
+
+ /**
+ * Schedules the given {@link Runnable} to be run at the specified time in
+ * the future on this {@link Dispatcher}.
+ *
+ * @param runnable
+ * The Runnable to execute
+ * @param delay
+ * The delay
+ * @param timeUnit
+ * The TimeUnit used to interpret delay.
+ */
+ public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit);
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java Fri Dec 4 08:56:50 2009
@@ -8,6 +8,6 @@
*/
public interface DispatcherAware {
- public void setDispatcher(IDispatcher dispatcher);
+ public void setDispatcher(Dispatcher dispatcher);
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DispatcherPool implements Dispatcher {
+
+ private final String name;
+
+ private final ThreadLocal<DispatcherThread> dispatcher = new ThreadLocal<DispatcherThread>();
+ private final ThreadLocal<PooledDispatchContext> dispatcherContext = new ThreadLocal<PooledDispatchContext>();
+ private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
+
+ final AtomicBoolean started = new AtomicBoolean();
+ final AtomicBoolean shutdown = new AtomicBoolean();
+
+ private int roundRobinCounter = 0;
+ private int size;
+ private final int numPriorities;
+
+ protected LoadBalancer loadBalancer;
+
+ protected DispatcherPool(String name, int size, int numPriorities) {
+ this.name = name;
+ this.size = size;
+ this.numPriorities = numPriorities;
+ loadBalancer = new SimpleLoadBalancer();
+ }
+
+ /**
+ * Subclasses should implement this to return a new dispatcher.
+ *
+ * @param name
+ * The name to assign the dispatcher.
+ * @param pool
+ * The pool.
+ * @return The new dispathcer.
+ */
+ protected DispatcherThread createDispatcher(String name, DispatcherPool pool) throws Exception {
+ return new DispatcherThread(name, numPriorities, this);
+ }
+
+ /**
+ * @see org.apache.activemq.dispatch.internal.advanced.Dispatcher#start()
+ */
+ public synchronized final void start() throws Exception {
+ loadBalancer.start();
+ if (started.compareAndSet(false, true)) {
+ // Create all the workers.
+ try {
+ for (int i = 0; i < size; i++) {
+ DispatcherThread dispatacher = createDispatcher(name + "-" + (i + 1), this);
+ dispatchers.add(dispatacher);
+ dispatacher.start();
+ }
+ } catch (Exception e) {
+ shutdown();
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public synchronized final void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ boolean interrupted = false;
+ while (!dispatchers.isEmpty()) {
+ try {
+ dispatchers.get(dispatchers.size() - 1).shutdown();
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ continue;
+ }
+ }
+ // Re-interrupt:
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+
+ loadBalancer.stop();
+ }
+
+ public void setCurrentDispatchContext(PooledDispatchContext context) {
+ dispatcherContext.set(context);
+ }
+
+ public PooledDispatchContext getCurrentDispatchContext() {
+ return dispatcherContext.get();
+ }
+
+ /**
+ * Returns the currently executing dispatcher, or null if the current thread
+ * is not a dispatcher:
+ *
+ * @return The currently executing dispatcher
+ */
+ public Dispatcher getCurrentDispatcher() {
+ return dispatcher.get();
+ }
+
+ /**
+ * A Dispatcher must call this to indicate that is has started it's dispatch
+ * loop.
+ */
+ public void onDispatcherStarted(DispatcherThread d) {
+ dispatcher.set(d);
+ loadBalancer.onDispatcherStarted(d);
+ }
+
+ public LoadBalancer getLoadBalancer() {
+ return loadBalancer;
+ }
+
+ /**
+ * A Dispatcher must call this when exiting it's dispatch loop
+ */
+ public void onDispatcherStopped(Dispatcher d) {
+ synchronized (dispatchers) {
+ if (dispatchers.remove(d)) {
+ size--;
+ }
+ }
+ loadBalancer.onDispatcherStopped(d);
+ }
+
+ protected DispatcherThread chooseDispatcher() {
+ DispatcherThread d = dispatcher.get();
+ if (d == null) {
+ synchronized (dispatchers) {
+ if(dispatchers.isEmpty())
+ {
+ throw new RejectedExecutionException();
+ }
+ if (++roundRobinCounter >= size) {
+ roundRobinCounter = 0;
+ }
+ return dispatchers.get(roundRobinCounter);
+ }
+ } else {
+ return d;
+ }
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return chooseDispatcher().register(dispatchable, name);
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public final Executor createPriorityExecutor(final int priority) {
+ return new Executor() {
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+ }
+
+ };
+ }
+
+ public int getDispatchPriorities() {
+ // TODO Auto-generated method stub
+ return numPriorities;
+ }
+
+ public void execute(Runnable command) {
+ chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
+ }
+
+ public void execute(Runnable command, int priority) {
+ chooseDispatcher().dispatch(new RunnableAdapter(command), priority);
+ }
+
+ public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+ chooseDispatcher().schedule(runnable, delay, timeUnit);
+ }
+
+ public void schedule(final Runnable runnable, int priority, long delay, TimeUnit timeUnit) {
+ chooseDispatcher().schedule(runnable, priority, delay, timeUnit);
+ }
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.DispatchSystem;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
+import org.apache.activemq.util.Mapper;
+import org.apache.activemq.util.PriorityLinkedList;
+import org.apache.activemq.util.TimerHeap;
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+
+public class DispatcherThread implements Runnable, Dispatcher {
+
+ private final ThreadDispatchQueue dispatchQueues[];
+
+ private static final boolean DEBUG = false;
+ private Thread thread;
+ protected boolean running = false;
+ private boolean threaded = false;
+ protected final int MAX_USER_PRIORITY;
+ protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
+
+ // Set if this dispatcher is part of a dispatch pool:
+ protected final DispatcherPool pooledDispatcher;
+
+ // The local dispatch queue:
+ protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+ // Dispatch queue for requests from other threads:
+ private final LinkedNodeList<ForeignEvent>[] foreignQueue;
+ private static final int[] TOGGLE = new int[] { 1, 0 };
+ private int foreignToggle = 0;
+
+ // Timed Execution List
+ protected final TimerHeap<Runnable> timerHeap = new TimerHeap<Runnable>() {
+ @Override
+ protected final void execute(Runnable ready) {
+ ready.run();
+ }
+ };
+
+ protected final String name;
+ private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+ private final Semaphore foreignPermits = new Semaphore(0);
+
+ private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+ public Integer map(PriorityDispatchContext element) {
+ return element.listPrio;
+ }
+ };
+
+ protected DispatcherThread(String name, int priorities, DispatcherPool pooledDispactcher) {
+ this.name = name;
+
+ this.dispatchQueues = new ThreadDispatchQueue[3];
+ for (int i = 0; i < 3; i++) {
+ dispatchQueues[i] = new ThreadDispatchQueue(this, DispatchQueuePriority.values()[i]);
+ }
+
+ MAX_USER_PRIORITY = priorities - 1;
+ priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+ foreignQueue = createForeignEventQueue();
+ for (int i = 0; i < 2; i++) {
+ foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
+ }
+ this.pooledDispatcher = pooledDispactcher;
+ }
+
+ public static final Dispatcher createPriorityDispatcher(String name, int numPriorities) {
+ return new DispatcherThread(name, numPriorities, null);
+ }
+
+ public static final Dispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
+ return new DispatcherPool(name, size, numPriorities);
+ }
+
+ @SuppressWarnings("unchecked")
+ private LinkedNodeList<ForeignEvent>[] createForeignEventQueue() {
+ return new LinkedNodeList[2];
+ }
+
+ protected abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+ public abstract void execute();
+
+ final void addToList() {
+ synchronized (foreignQueue) {
+ if (!this.isLinked()) {
+ foreignQueue[foreignToggle].addLast(this);
+ if (!foreignAvailable.getAndSet(true)) {
+ wakeup();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isThreaded() {
+ return threaded;
+ }
+
+ public void setThreaded(boolean threaded) {
+ this.threaded = threaded;
+ }
+
+ public int getDispatchPriorities() {
+ return MAX_USER_PRIORITY;
+ }
+
+ private class UpdateEvent extends ForeignEvent {
+ private final PriorityDispatchContext pdc;
+
+ UpdateEvent(PriorityDispatchContext pdc) {
+ this.pdc = pdc;
+ }
+
+ // Can only be called by the owner of this dispatch context:
+ public void execute() {
+ pdc.processForeignUpdates();
+ }
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return new PriorityDispatchContext(dispatchable, true, name);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#start()
+ */
+ public synchronized final void start() {
+ if (thread == null) {
+ running = true;
+ thread = new Thread(this, name);
+ thread.start();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public void shutdown() throws InterruptedException {
+ Thread joinThread = null;
+ synchronized (this) {
+ if (thread != null) {
+ dispatchInternal(new RunnableAdapter() {
+ public void run() {
+ running = false;
+ }
+ }, MAX_USER_PRIORITY + 1);
+ joinThread = thread;
+ thread = null;
+ }
+ }
+ if (joinThread != null) {
+ // thread.interrupt();
+ joinThread.join();
+ }
+ }
+
+ protected void cleanup() {
+ ArrayList<PriorityDispatchContext> toClose = null;
+ synchronized (this) {
+ running = false;
+ toClose = new ArrayList<PriorityDispatchContext>(contexts.size());
+ toClose.addAll(contexts);
+ }
+
+ for (PriorityDispatchContext context : toClose) {
+ context.close(false);
+ }
+ }
+
+ public void run() {
+
+ if (pooledDispatcher != null) {
+ // Inform the dispatcher that we have started:
+ pooledDispatcher.onDispatcherStarted((DispatcherThread) this);
+ }
+
+ PriorityDispatchContext pdc;
+ try {
+ final int MAX_DISPATCH_PER_LOOP = 20;
+ int processed = 0;
+
+ while (running) {
+ pdc = priorityQueue.poll();
+ // If no local work available wait for foreign work:
+ if (pdc == null) {
+ waitForEvents();
+ } else {
+ if( pdc.priority < dispatchQueues.length ) {
+ DispatchSystem.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
+ }
+
+ if (pdc.tracker != null) {
+ pooledDispatcher.setCurrentDispatchContext(pdc);
+ }
+
+ while (!pdc.dispatch()) {
+ processed++;
+ if (processed > MAX_DISPATCH_PER_LOOP || pdc.listPrio < priorityQueue.getHighestPriority()) {
+ // Give other dispatchables a shot:
+ // May have gotten relinked by the caller:
+ if (!pdc.isLinked()) {
+ priorityQueue.add(pdc, pdc.listPrio);
+ }
+ break;
+ }
+ }
+
+ if (pdc.tracker != null) {
+ pooledDispatcher.setCurrentDispatchContext(null);
+ }
+
+ if (processed < MAX_DISPATCH_PER_LOOP) {
+ continue;
+ }
+ }
+
+ processed = 0;
+ // Execute delayed events:
+ timerHeap.executeReadyTimers();
+
+ // Allow subclasses to do additional work:
+ dispatchHook();
+
+ // Check for foreign dispatch requests:
+ if (foreignAvailable.get()) {
+ LinkedNodeList<ForeignEvent> foreign;
+ synchronized (foreignQueue) {
+ // Swap foreign queues and drain permits;
+ foreign = foreignQueue[foreignToggle];
+ foreignToggle = TOGGLE[foreignToggle];
+ foreignAvailable.set(false);
+ foreignPermits.drainPermits();
+ }
+ while (true) {
+ ForeignEvent fe = foreign.getHead();
+ if (fe == null) {
+ break;
+ }
+
+ fe.unlink();
+ fe.execute();
+ }
+
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ } finally {
+ if (pooledDispatcher != null) {
+ pooledDispatcher.onDispatcherStopped((DispatcherThread) this);
+ }
+ cleanup();
+ }
+ }
+
+ /**
+ * Subclasses may override this to do do additional dispatch work:
+ */
+ protected void dispatchHook() throws Exception {
+
+ }
+
+ /**
+ * Subclasses may override this to implement another mechanism for wakeup.
+ *
+ * @throws Exception
+ */
+ protected void waitForEvents() throws Exception {
+ long next = timerHeap.timeToNext(TimeUnit.NANOSECONDS);
+ if (next == -1) {
+ foreignPermits.acquire();
+ } else if (next > 0) {
+ foreignPermits.tryAcquire(next, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ /**
+ * Subclasses may override this to provide an alternative wakeup mechanism.
+ */
+ protected void wakeup() {
+ foreignPermits.release();
+ }
+
+ protected final void onForeignUpdate(PriorityDispatchContext context) {
+ synchronized (foreignQueue) {
+
+ ForeignEvent fe = context.updateEvent[foreignToggle];
+ if (!fe.isLinked()) {
+ foreignQueue[foreignToggle].addLast(fe);
+ if (!foreignAvailable.getAndSet(true)) {
+ wakeup();
+ }
+ }
+ }
+ }
+
+ protected final boolean removeDispatchContext(PriorityDispatchContext context) {
+ synchronized (foreignQueue) {
+
+ if (context.updateEvent[0].isLinked()) {
+ context.updateEvent[0].unlink();
+ }
+ if (context.updateEvent[1].isLinked()) {
+ context.updateEvent[1].unlink();
+ }
+ }
+
+ if (context.isLinked()) {
+ context.unlink();
+ return true;
+ }
+
+ synchronized (this) {
+ contexts.remove(context);
+ }
+
+ return false;
+ }
+
+ protected final boolean takeOwnership(PriorityDispatchContext context) {
+ synchronized (this) {
+ if (running) {
+ contexts.add(context);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ //Special dispatch method that allow high priority dispatch:
+ private final void dispatchInternal(Dispatchable dispatchable, int priority) {
+ PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+ context.priority = priority;
+ context.requestDispatch();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+ * .dispatch.Dispatcher.Dispatchable)
+ */
+ public final void dispatch(Dispatchable dispatchable, int priority) {
+ PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+ context.updatePriority(priority);
+ context.requestDispatch();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+ */
+ public Executor createPriorityExecutor(final int priority) {
+
+ return new Executor() {
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), priority);
+ }
+ };
+ }
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), 0);
+ }
+
+ public void execute(final Runnable runnable, int prio) {
+ dispatch(new RunnableAdapter(runnable), prio);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+ * long, java.util.concurrent.TimeUnit)
+ */
+ public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+ schedule(runnable, 0, delay, timeUnit);
+ }
+
+ public void schedule(final Runnable runnable, final int prio, final long delay, final TimeUnit timeUnit) {
+ final Runnable wrapper = new Runnable() {
+ public void run() {
+ execute(runnable, prio);
+ }
+ };
+ if (getCurrentDispatcher() == this) {
+ timerHeap.addRelative(wrapper, delay, timeUnit);
+ } else {
+ new ForeignEvent() {
+ public void execute() {
+ timerHeap.addRelative(wrapper, delay, timeUnit);
+ }
+ }.addToList();
+ }
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ private final DispatcherThread getCurrentDispatcher() {
+ if (pooledDispatcher != null) {
+ return (DispatcherThread) pooledDispatcher.getCurrentDispatcher();
+ } else if (Thread.currentThread() == thread) {
+ return (DispatcherThread) this;
+ } else {
+ return null;
+ }
+
+ }
+
+ private final PooledDispatchContext getCurrentDispatchContext() {
+ return pooledDispatcher.getCurrentDispatchContext();
+ }
+
+ /**
+ *
+ */
+ protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext {
+ // The dispatchable target:
+ private final Dispatchable dispatchable;
+ // The name of this context:
+ final String name;
+ // list prio can only be updated in the thread of of the owning
+ // dispatcher
+ protected int listPrio;
+
+ // The update events are used to update fields in the dispatch context
+ // from foreign threads:
+ final UpdateEvent updateEvent[];
+
+ private final ExecutionTracker tracker;
+ protected DispatcherThread currentOwner;
+ private DispatcherThread updateDispatcher = null;
+
+ private int priority;
+ private boolean dispatchRequested = false;
+ private boolean closed = false;
+ final CountDownLatch closeLatch = new CountDownLatch(1);
+
+ protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+ this.dispatchable = dispatchable;
+ this.name = name;
+ this.currentOwner = (DispatcherThread) DispatcherThread.this;
+ if (persistent && pooledDispatcher != null) {
+ this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
+ } else {
+ this.tracker = null;
+ }
+ updateEvent = createUpdateEvent();
+ updateEvent[0] = new UpdateEvent(this);
+ updateEvent[1] = new UpdateEvent(this);
+ if (persistent) {
+ currentOwner.takeOwnership(this);
+ }
+ }
+
+ private final DispatcherThread.UpdateEvent[] createUpdateEvent() {
+ return new DispatcherThread.UpdateEvent[2];
+ }
+
+ /**
+ * Gets the execution tracker for the context.
+ *
+ * @return the execution tracker for the context:
+ */
+ public ExecutionTracker getExecutionTracker() {
+ return tracker;
+ }
+
+ /**
+ * This can only be called by the owning dispatch thread:
+ *
+ * @return False if the dispatchable has more work to do.
+ */
+ public final boolean dispatch() {
+ return dispatchable.dispatch();
+ }
+
+ public final void assignToNewDispatcher(Dispatcher newDispatcher) {
+ synchronized (this) {
+
+ // If we're already set to this dispatcher
+ if (newDispatcher == currentOwner) {
+ if (updateDispatcher == null || updateDispatcher == newDispatcher) {
+ return;
+ }
+ }
+
+ updateDispatcher = (DispatcherThread) newDispatcher;
+ if (DEBUG)
+ System.out.println(getName() + " updating to " + updateDispatcher);
+
+ currentOwner.onForeignUpdate(this);
+ }
+
+ }
+
+ public void requestDispatch() {
+
+ DispatcherThread callingDispatcher = getCurrentDispatcher();
+ if (tracker != null)
+ tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
+
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize
+ // to protect against ownership changes:
+ synchronized (this) {
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+
+ if (!currentOwner.running) {
+ // TODO In the event that the current dispatcher
+ // failed due to a runtime exception, we could
+ // try to switch to a new dispatcher.
+ throw new RejectedExecutionException();
+ }
+ if (!isLinked()) {
+ currentOwner.priorityQueue.add(this, listPrio);
+ }
+ return;
+ }
+
+ dispatchRequested = true;
+ currentOwner.onForeignUpdate(this);
+ }
+ }
+
+ public void updatePriority(int priority) {
+
+ if (closed) {
+ return;
+ }
+
+ priority = Math.min(priority, MAX_USER_PRIORITY);
+
+ if (this.priority == priority) {
+ return;
+ }
+ DispatcherThread callingDispatcher = getCurrentDispatcher();
+
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize to protect against ownership changes:
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ this.priority = priority;
+
+ // If this is called by the owning dispatcher, then we go ahead
+ // and update:
+ if (currentOwner == callingDispatcher) {
+
+ if (priority != listPrio) {
+
+ listPrio = priority;
+ // If there is a priority change relink the context
+ // at the new priority:
+ if (isLinked()) {
+ unlink();
+ currentOwner.priorityQueue.add(this, listPrio);
+ }
+ }
+ return;
+ }
+
+ currentOwner.onForeignUpdate(this);
+ }
+
+ }
+
+ public void processForeignUpdates() {
+ synchronized (this) {
+
+ if (closed) {
+ close(false);
+ return;
+ }
+
+ if (updateDispatcher != null && updateDispatcher.takeOwnership(this)) {
+ if (DEBUG) {
+ System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+ }
+
+ if (currentOwner.removeDispatchContext(this)) {
+ dispatchRequested = true;
+ }
+
+ updateDispatcher.onForeignUpdate(this);
+ switchedDispatcher(currentOwner, updateDispatcher);
+ currentOwner = updateDispatcher;
+ updateDispatcher = null;
+
+ } else {
+ updatePriority(priority);
+
+ if (dispatchRequested) {
+ dispatchRequested = false;
+ requestDispatch();
+ }
+ }
+ }
+ }
+
+ /**
+ * May be overriden by subclass to additional work on dispatcher switch
+ *
+ * @param oldDispatcher The old dispatcher
+ * @param newDispatcher The new Dispatcher
+ */
+ protected void switchedDispatcher(DispatcherThread oldDispatcher, DispatcherThread newDispatcher) {
+
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void close(boolean sync) {
+ DispatcherThread callingDispatcher = getCurrentDispatcher();
+ // System.out.println(this + "Closing");
+ synchronized (this) {
+ closed = true;
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+ removeDispatchContext(this);
+ closeLatch.countDown();
+ return;
+ }
+ }
+
+ currentOwner.onForeignUpdate(this);
+ if (sync) {
+ boolean interrupted = false;
+ while (true) {
+ try {
+ closeLatch.await();
+ break;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public final String toString() {
+ return getName();
+ }
+
+ public Dispatchable getDispatchable() {
+ return dispatchable;
+ }
+
+ public DispatcherThread getDispatcher() {
+ return currentOwner;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ }
+
+ public String getName() {
+ return name;
+ }
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Fri Dec 4 08:56:50 2009
@@ -16,11 +16,11 @@
*/
package org.apache.activemq.dispatch.internal.advanced;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.QueueSupport;
/**
*
@@ -28,14 +28,16 @@
*/
public class GlobalDispatchQueue implements DispatchQueue {
- private final AdancedDispatchSystem system;
- private Executor executor;
- final String label;
+ private final String label;
+ private final AdancedDispatchSPI system;
+ private final DispatcherPool dispatcher;
+ private final DispatchQueuePriority priority;
- public GlobalDispatchQueue(AdancedDispatchSystem system, DispatchQueuePriority priority) {
+ public GlobalDispatchQueue(AdancedDispatchSPI system, DispatchQueuePriority priority) {
this.system = system;
+ this.priority = priority;
this.label=priority.toString();
- executor = this.system.pooledDispatcher.createPriorityExecutor(priority.ordinal());
+ this.dispatcher = this.system.pooledDispatcher;
}
public String getLabel() {
@@ -43,11 +45,11 @@
}
public void dispatchAsync(Runnable runnable) {
- executor.execute(runnable);
+ dispatcher.execute(runnable, priority.ordinal());
}
public void dispatchAfter(long delayMS, Runnable runnable) {
- this.system.pooledDispatcher.schedule(runnable, delayMS, TimeUnit.MILLISECONDS);
+ dispatcher.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
}
public void dispatchSync(final Runnable runnable) throws InterruptedException {
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+
+public interface LoadBalancer {
+
+ public interface ExecutionTracker {
+
+ /**
+ * Should be called when a {@link DispatchContext#requestDispatch()} is called.
+ * This assists the load balancer in determining relationships between {@link DispatchContext}s
+ * @param caller The calling dispatcher
+ * @param context The context from which the dispatch is requested.
+ */
+ public void onDispatchRequest(Dispatcher caller, PooledDispatchContext context);
+
+ /**
+ * Must be called by the dispatcher when a {@link DispatchContext} is closed.
+ */
+ public void close();
+ }
+
+ /**
+ * Must be called by a dispatch thread when it starts
+ * @param dispatcher The dispatcher
+ */
+ public void onDispatcherStarted(Dispatcher dispatcher);
+
+ /**
+ * Must be called by a dispatch thread when it stops
+ * @param dispatcher The dispatcher
+ */
+ public void onDispatcherStopped(Dispatcher dispatcher);
+
+ /**
+ * Gets an {@link ExecutionTracker} for the dispatch context.
+ * @param context
+ * @return
+ */
+ public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+
+ /**
+ * Starts execution tracking
+ */
+ public void start();
+
+ /**
+ * Stops execution tracking
+ */
+ public void stop();
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java?rev=887117&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java Fri Dec 4 08:56:50 2009
@@ -0,0 +1,31 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+
+/**
+ * A {@link PooledDispatchContext}s can be moved between different
+ * dispatchers.
+ */
+public interface PooledDispatchContext extends DispatchContext {
+ /**
+ * Called to transfer a {@link PooledDispatchContext} to a new
+ * Dispatcher.
+ */
+ public void assignToNewDispatcher(Dispatcher newDispatcher);
+
+ /**
+ * Gets the dispatcher to which this PooledDispatchContext currently
+ * belongs
+ *
+ * @return
+ */
+ public Dispatcher getDispatcher();
+
+ /**
+ * Gets the execution tracker for the context.
+ *
+ * @return the execution tracker for the context:
+ */
+ public ExecutionTracker getExecutionTracker();
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?rev=887117&r1=887116&r2=887117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Fri Dec 4 08:56:50 2009
@@ -22,16 +22,15 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
-public class SimpleLoadBalancer implements ExecutionLoadBalancer {
+public class SimpleLoadBalancer implements LoadBalancer {
private final boolean DEBUG = false;
//TODO: Added plumbing for periodic rebalancing which we should
//consider implementing
private static final boolean ENABLE_UPDATES = false;
- private final ArrayList<IDispatcher> dispatchers = new ArrayList<IDispatcher>();
+ private final ArrayList<Dispatcher> dispatchers = new ArrayList<Dispatcher>();
private AtomicBoolean running = new AtomicBoolean(false);
private boolean needsUpdate = false;
@@ -86,7 +85,7 @@
running.compareAndSet(true, false);
}
- public synchronized final void onDispatcherStarted(IDispatcher dispatcher) {
+ public synchronized final void onDispatcherStarted(Dispatcher dispatcher) {
dispatchers.add(dispatcher);
scheduleNext();
}
@@ -94,7 +93,7 @@
/**
* A Dispatcher must call this when exiting it's dispatch loop
*/
- public void onDispatcherStopped(IDispatcher dispatcher) {
+ public void onDispatcherStopped(Dispatcher dispatcher) {
dispatchers.remove(dispatcher);
}
@@ -123,7 +122,7 @@
private final AtomicInteger work = new AtomicInteger(0);
private PooledDispatchContext singleSource;
- private IDispatcher currentOwner;
+ private Dispatcher currentOwner;
SimpleExecutionTracker(PooledDispatchContext context) {
this.context = context;
@@ -144,7 +143,7 @@
* @return True if this method resulted in the dispatch request being
* assigned to another dispatcher.
*/
- public void onDispatchRequest(IDispatcher callingDispatcher, PooledDispatchContext callingContext) {
+ public void onDispatchRequest(Dispatcher callingDispatcher, PooledDispatchContext callingContext) {
if (callingContext != null) {
// Make sure we are being called by another node: