You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 20:48:22 UTC
svn commit: r887333 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/ activ...
Author: chirino
Date: Fri Dec 4 19:48:21 2009
New Revision: 887333
URL: http://svn.apache.org/viewvc?rev=887333&view=rev
Log:
AdvancedDispatchSPI is now used directly be the rest of the activemq modules..
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
- copied, changed from r887252, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdancedDispatchSPI.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java Fri Dec 4 19:48:21 2009
@@ -25,7 +25,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@@ -42,7 +43,7 @@
protected int inputResumeThreshold = 512 * 1024;
protected boolean useAsyncWriteThread = true;
- private Dispatcher dispatcher;
+ private AdvancedDispatchSPI dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
private ExecutorService blockingWriter;
private ExceptionListener exceptionListener;
@@ -170,11 +171,11 @@
this.priorityLevels = priorityLevels;
}
- public Dispatcher getDispatcher() {
+ public AdvancedDispatchSPI getDispatcher() {
return dispatcher;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java Fri Dec 4 19:48:21 2009
@@ -25,8 +25,10 @@
import org.apache.activemq.Service;
import org.apache.activemq.apollo.Connection;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherAware;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
@@ -51,7 +53,7 @@
private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer, VirtualHost>();
private VirtualHost defaultVirtualHost;
- private Dispatcher dispatcher;
+ private AdvancedDispatchSPI dispatcher;
private File dataDirectory;
private final class BrokerAcceptListener implements TransportAcceptListener {
@@ -129,7 +131,7 @@
// apply some default configuration to this broker instance before it's started.
if( dispatcher == null ) {
int threads = Runtime.getRuntime().availableProcessors();
- dispatcher = DispatcherThread.createPriorityDispatchPool("Broker: "+getDefaultVirtualHost().getHostName(), Broker.MAX_PRIORITY, threads);
+ dispatcher = new AdvancedDispatchSPI(threads, Broker.MAX_PRIORITY);
}
@@ -376,10 +378,10 @@
// /////////////////////////////////////////////////////////////////
// Property Accessors
// /////////////////////////////////////////////////////////////////
- public Dispatcher getDispatcher() {
+ public AdvancedDispatchSPI getDispatcher() {
return dispatcher;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
assertInConfigurationState();
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Fri Dec 4 19:48:21 2009
@@ -40,8 +40,8 @@
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherAware;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
@@ -71,7 +71,7 @@
private final FlowController<OperationBase<?>> storeController;
private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
- private Dispatcher dispatcher;
+ private AdvancedDispatchSPI dispatcher;
private Thread flushThread;
private AtomicBoolean running = new AtomicBoolean(false);
private DatabaseListener listener;
@@ -1288,11 +1288,11 @@
return store.allocateStoreTracking();
}
- public Dispatcher getDispatcher() {
+ public AdvancedDispatchSPI getDispatcher() {
return dispatcher;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Fri Dec 4 19:48:21 2009
@@ -24,7 +24,7 @@
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PrioritySizeLimiter;
import org.apache.activemq.flow.SizeLimiter;
@@ -52,7 +52,7 @@
private static final boolean USE_PRIORITY_QUEUES = true;
private BrokerDatabase database;
- private Dispatcher dispatcher;
+ private AdvancedDispatchSPI dispatcher;
private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
@@ -226,7 +226,7 @@
this.database = database;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
this.dispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Fri Dec 4 19:48:21 2009
@@ -30,7 +30,8 @@
import org.apache.activemq.apollo.broker.Router;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.Period;
@@ -87,7 +88,7 @@
protected Broker sendBroker;
protected Broker rcvBroker;
protected ArrayList<Broker> brokers = new ArrayList<Broker>();
- protected Dispatcher dispatcher;
+ protected AdvancedDispatchSPI dispatcher;
protected final AtomicLong msgIdGenerator = new AtomicLong();
protected final AtomicBoolean stopping = new AtomicBoolean();
@@ -134,8 +135,8 @@
protected abstract String getRemoteWireFormat();
- protected Dispatcher createDispatcher() {
- return DispatcherThread.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+ protected AdvancedDispatchSPI createDispatcher() {
+ return new AdvancedDispatchSPI(asyncThreadPoolSize, Broker.MAX_PRIORITY);
}
@Test
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java Fri Dec 4 19:48:21 2009
@@ -27,7 +27,8 @@
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.StoreFactory;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.queue.IQueue;
@@ -38,7 +39,7 @@
public class SharedQueueTest extends TestCase {
- Dispatcher dispatcher;
+ AdvancedDispatchSPI dispatcher;
BrokerDatabase database;
BrokerQueueStore queueStore;
private static final boolean USE_KAHA_DB = true;
@@ -47,8 +48,8 @@
protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
- protected Dispatcher createDispatcher() {
- return DispatcherThread.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+ protected AdvancedDispatchSPI createDispatcher() {
+ return new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), Broker.MAX_PRIORITY);
}
protected int consumerStartDelay = 0;
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java?rev=887333&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSPI.java Fri Dec 4 19:48:21 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+import java.nio.channels.SelectableChannel;
+
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+public interface DispatchSPI {
+ public void start();
+ public void shutdown(Runnable onShutdown);
+
+ public DispatchQueue getMainQueue();
+ public DispatchQueue getGlobalQueue(DispatchQueuePriority priority);
+ public DispatchQueue createQueue(String label);
+ public void dispatchMain();
+ public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec 4 19:48:21 2009
@@ -32,14 +32,6 @@
LOW;
}
- static abstract public class DispatchSPI {
- abstract public DispatchQueue getMainQueue();
- abstract public DispatchQueue getGlobalQueue(DispatchQueuePriority priority);
- abstract public DispatchQueue createQueue(String label);
- abstract public void dispatchMain();
- abstract public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue);
- }
-
public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
static public DispatchQueue getCurrentQueue() {
return CURRENT_QUEUE.get();
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java (from r887252, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java&r1=887252&r2=887333&rev=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java Fri Dec 4 19:48:21 2009
@@ -1,4 +1,6 @@
-package org.apache.activemq.dispatch.internal.advanced;
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
/**
* Handy interface to signal classes which would like an IDispatcher instance
@@ -8,6 +10,6 @@
*/
public interface DispatcherAware {
- public void setDispatcher(Dispatcher dispatcher);
+ public void setDispatcher(AdvancedDispatchSPI dispatcher);
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java?rev=887333&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatchSPI.java Fri Dec 4 19:48:21 2009
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.nio.channels.SelectableChannel;
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSPI;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
+
+import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+
+public class AdvancedDispatchSPI implements DispatchSPI {
+
+ final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+ final GlobalDispatchQueue globalQueues[];
+ final AtomicLong globalQueuedRunnables = new AtomicLong();
+
+ private final ThreadLocal<DispatcherThread> dispatcher = new ThreadLocal<DispatcherThread>();
+ private final ThreadLocal<PooledDispatchContext> dispatcherContext = new ThreadLocal<PooledDispatchContext>();
+ private final ArrayList<DispatcherThread> dispatchers = new ArrayList<DispatcherThread>();
+
+ final AtomicInteger startCounter = new AtomicInteger();
+// final AtomicBoolean started = new AtomicBoolean();
+// final AtomicBoolean shutdown = new AtomicBoolean();
+
+ private int roundRobinCounter = 0;
+ private int size;
+ private final int numPriorities;
+
+ protected LoadBalancer loadBalancer;
+
+ public AdvancedDispatchSPI(int size, int numPriorities) {
+ this.size = size;
+ this.numPriorities = numPriorities;
+
+ globalQueues = new GlobalDispatchQueue[3];
+ for (int i = 0; i < 3; i++) {
+ globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i]);
+ }
+
+ loadBalancer = new SimpleLoadBalancer();
+ }
+
+ /**
+ * Subclasses should implement this to return a new dispatcher.
+ *
+ * @param name
+ * The name to assign the dispatcher.
+ * @param pool
+ * The pool.
+ * @return The new dispathcer.
+ */
+ protected DispatcherThread createDispatcher(String name) throws Exception {
+ return new DispatcherThread(this, name, numPriorities);
+ }
+
+ /**
+ * @see org.apache.activemq.dispatch.internal.advanced.Dispatcher#start()
+ */
+ public synchronized final void start() {
+ if( startCounter.getAndIncrement()==0 ) {
+ // Create all the workers.
+ try {
+ loadBalancer.start();
+ for (int i = 0; i < size; i++) {
+ DispatcherThread dispatacher = createDispatcher("dispatcher -" + (i + 1));
+ dispatchers.add(dispatacher);
+ dispatacher.start();
+ }
+ } catch (Exception e) {
+ shutdown();
+ }
+ }
+ }
+
+ public final void shutdown() {
+ shutdown(null);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public final void shutdown(Runnable onShutdown) {
+ if( startCounter.decrementAndGet()==0 ) {
+ final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.size());
+ for (DispatcherThread d : new ArrayList<DispatcherThread>(dispatchers)) {
+ d.shutdown(shutdownCountDown, onShutdown);
+ }
+ loadBalancer.stop();
+ }
+ }
+
+ public void setCurrentDispatchContext(PooledDispatchContext context) {
+ dispatcherContext.set(context);
+ }
+
+ public PooledDispatchContext getCurrentDispatchContext() {
+ return dispatcherContext.get();
+ }
+
+ /**
+ * Returns the currently executing dispatcher, or null if the current thread
+ * is not a dispatcher:
+ *
+ * @return The currently executing dispatcher
+ */
+ public Dispatcher getCurrentDispatcher() {
+ return dispatcher.get();
+ }
+
+ /**
+ * A Dispatcher must call this to indicate that is has started it's dispatch
+ * loop.
+ */
+ public void onDispatcherStarted(DispatcherThread d) {
+ dispatcher.set(d);
+ loadBalancer.onDispatcherStarted(d);
+ }
+
+ public LoadBalancer getLoadBalancer() {
+ return loadBalancer;
+ }
+
+ /**
+ * A Dispatcher must call this when exiting it's dispatch loop
+ */
+ public void onDispatcherStopped(Dispatcher d) {
+ synchronized (dispatchers) {
+ if (dispatchers.remove(d)) {
+ size--;
+ }
+ }
+ loadBalancer.onDispatcherStopped(d);
+ }
+
+ protected DispatcherThread chooseDispatcher() {
+ DispatcherThread d = dispatcher.get();
+ if (d == null) {
+ synchronized (dispatchers) {
+ if(dispatchers.isEmpty())
+ {
+ throw new RejectedExecutionException();
+ }
+ if (++roundRobinCounter >= size) {
+ roundRobinCounter = 0;
+ }
+ return dispatchers.get(roundRobinCounter);
+ }
+ } else {
+ return d;
+ }
+ }
+
+ public DispatchContext register(Runnable runnable, String name) {
+ return chooseDispatcher().register(runnable, name);
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public final Executor createPriorityExecutor(final int priority) {
+ return new Executor() {
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(runnable, priority);
+ }
+
+ };
+ }
+
+ public int getDispatchPriorities() {
+ // TODO Auto-generated method stub
+ return numPriorities;
+ }
+
+ public void execute(Runnable command) {
+ chooseDispatcher().dispatch(command, 0);
+ }
+
+ public void execute(Runnable command, int priority) {
+ chooseDispatcher().dispatch(command, priority);
+ }
+
+ public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+ chooseDispatcher().schedule(runnable, delay, timeUnit);
+ }
+
+ public void schedule(final Runnable runnable, int priority, long delay, TimeUnit timeUnit) {
+ chooseDispatcher().schedule(runnable, priority, delay, timeUnit);
+ }
+
+ public DispatchQueue getMainQueue() {
+ return mainQueue;
+ }
+
+ public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+ return globalQueues[priority.ordinal()];
+ }
+
+ public DispatchQueue createQueue(String label) {
+ SerialDispatchQueue rc = new SerialDispatchQueue(label);
+ rc.setTargetQueue(getGlobalQueue(DEFAULT));
+ return rc;
+ }
+
+ public void dispatchMain() {
+ mainQueue.run();
+ }
+
+ public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
+ return null;
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec 4 19:48:21 2009
@@ -24,6 +24,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.dispatch.DispatchObserver;
import org.apache.activemq.dispatch.DispatchSystem;
@@ -46,7 +47,7 @@
protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
// Set if this dispatcher is part of a dispatch pool:
- protected final DispatcherPool dispatcherPool;
+ protected final AdvancedDispatchSPI spi;
// The local dispatch queue:
protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
@@ -74,7 +75,7 @@
}
};
- protected DispatcherThread(String name, int priorities, DispatcherPool pooledDispactcher) {
+ protected DispatcherThread(AdvancedDispatchSPI spi, String name, int priorities) {
this.name = name;
this.dispatchQueues = new ThreadDispatchQueue[3];
@@ -88,15 +89,7 @@
for (int i = 0; i < 2; i++) {
foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
}
- this.dispatcherPool = pooledDispactcher;
- }
-
- public static final Dispatcher createPriorityDispatcher(String name, int numPriorities) {
- return new DispatcherThread(name, numPriorities, null);
- }
-
- public static final Dispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
- return new DispatcherPool(name, size, numPriorities);
+ this.spi = spi;
}
@SuppressWarnings("unchecked")
@@ -167,21 +160,33 @@
* @see org.apache.activemq.dispatch.IDispatcher#shutdown()
*/
public void shutdown() throws InterruptedException {
- Thread joinThread = null;
+ Thread joinThread = shutdown(new AtomicInteger(1), null);
+ if (joinThread != null) {
+ // thread.interrupt();
+ joinThread.join();
+ }
+ }
+
+ public Thread shutdown(final AtomicInteger shutdownCountDown, final Runnable onShutdown) {
synchronized (this) {
if (thread != null) {
dispatchInternal(new Runnable() {
public void run() {
running = false;
+ if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+ onShutdown.run();
+ }
}
}, MAX_USER_PRIORITY + 1);
- joinThread = thread;
+ Thread rc = thread;
thread = null;
+ return rc;
+ } else {
+ if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+ onShutdown.run();
+ }
}
- }
- if (joinThread != null) {
- // thread.interrupt();
- joinThread.join();
+ return null;
}
}
@@ -200,9 +205,9 @@
public void run() {
- if (dispatcherPool != null) {
+ if (spi != null) {
// Inform the dispatcher that we have started:
- dispatcherPool.onDispatcherStarted((DispatcherThread) this);
+ spi.onDispatcherStarted((DispatcherThread) this);
}
PriorityDispatchContext pdc;
@@ -216,14 +221,14 @@
}
if (pdc.tracker != null) {
- dispatcherPool.setCurrentDispatchContext(pdc);
+ spi.setCurrentDispatchContext(pdc);
}
counter++;
pdc.run();
if (pdc.tracker != null) {
- dispatcherPool.setCurrentDispatchContext(null);
+ spi.setCurrentDispatchContext(null);
}
}
@@ -263,8 +268,8 @@
} catch (Throwable thrown) {
thrown.printStackTrace();
} finally {
- if (dispatcherPool != null) {
- dispatcherPool.onDispatcherStopped((DispatcherThread) this);
+ if (spi != null) {
+ spi.onDispatcherStopped((DispatcherThread) this);
}
cleanup();
}
@@ -421,8 +426,8 @@
}
private final DispatcherThread getCurrentDispatcher() {
- if (dispatcherPool != null) {
- return (DispatcherThread) dispatcherPool.getCurrentDispatcher();
+ if (spi != null) {
+ return (DispatcherThread) spi.getCurrentDispatcher();
} else if (Thread.currentThread() == thread) {
return (DispatcherThread) this;
} else {
@@ -432,7 +437,7 @@
}
private final PooledDispatchContext getCurrentDispatchContext() {
- return dispatcherPool.getCurrentDispatchContext();
+ return spi.getCurrentDispatchContext();
}
/**
@@ -464,8 +469,8 @@
this.runnable = runnable;
this.name = name;
this.currentOwner = (DispatcherThread) DispatcherThread.this;
- if (persistent && dispatcherPool != null) {
- this.tracker = dispatcherPool.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
+ if (persistent && spi != null) {
+ this.tracker = spi.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
} else {
this.tracker = null;
}
@@ -688,4 +693,5 @@
public String getName() {
return name;
}
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Fri Dec 4 19:48:21 2009
@@ -29,15 +29,13 @@
public class GlobalDispatchQueue implements DispatchQueue {
private final String label;
- private final AdancedDispatchSPI system;
- private final DispatcherPool dispatcher;
+ private final AdvancedDispatchSPI spi;
private final DispatchQueuePriority priority;
- public GlobalDispatchQueue(AdancedDispatchSPI system, DispatchQueuePriority priority) {
- this.system = system;
+ public GlobalDispatchQueue(AdvancedDispatchSPI spi, DispatchQueuePriority priority) {
+ this.spi = spi;
this.priority = priority;
this.label=priority.toString();
- this.dispatcher = this.system.pooledDispatcher;
}
public String getLabel() {
@@ -45,11 +43,11 @@
}
public void dispatchAsync(Runnable runnable) {
- dispatcher.execute(runnable, priority.ordinal());
+ spi.execute(runnable, priority.ordinal());
}
public void dispatchAfter(long delayMS, Runnable runnable) {
- dispatcher.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
+ spi.schedule(runnable, priority.ordinal(), delayMS, TimeUnit.MILLISECONDS);
}
public void dispatchSync(final Runnable runnable) throws InterruptedException {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Fri Dec 4 19:48:21 2009
@@ -29,9 +29,9 @@
final public class DispatcherThread extends Thread {
private static final int MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO = 10000;
private final SimpleDispatchSPI spi;
- private final ThreadDispatchQueue[] threadQueues;
+ final ThreadDispatchQueue[] threadQueues;
final AtomicLong threadQueuedRunnables = new AtomicLong();
-
+
public DispatcherThread(SimpleDispatchSPI spi, int ordinal) {
this.spi = spi;
this.threadQueues = new ThreadDispatchQueue[3];
@@ -44,42 +44,43 @@
@Override
public void run() {
- outer: while( true ) {
- int counter=0;
- for (SimpleQueue queue : threadQueues) {
- DispatchSystem.CURRENT_QUEUE.set(queue);
- Runnable runnable;
- while( (runnable = queue.poll())!=null ) {
- dispatch(runnable);
- counter++;
+ try {
+ outer: while( true ) {
+ int counter=0;
+ for (SimpleQueue queue : threadQueues) {
+ DispatchSystem.CURRENT_QUEUE.set(queue);
+ Runnable runnable;
+ while( (runnable = queue.poll())!=null ) {
+ dispatch(runnable);
+ counter++;
+ }
+ }
+ if( counter!=0 ) {
+ // don't service the global queues until the thread queues are
+ // drained.
+ continue;
}
- }
- if( counter!=0 ) {
- // don't service the global queues until the thread queues are
- // drained.
- continue;
- }
-
- for (SimpleQueue queue : spi.globalQueues) {
- DispatchSystem.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
- Runnable runnable;
- while( (runnable = queue.poll())!=null ) {
- dispatch(runnable);
- counter++;
+ for (SimpleQueue queue : spi.globalQueues) {
+ DispatchSystem.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
- // Thread queues have the priority.
- if( threadQueuedRunnables.get()!=0 ) {
- continue outer;
+ Runnable runnable;
+ while( (runnable = queue.poll())!=null ) {
+ dispatch(runnable);
+ counter++;
+
+ // Thread queues have the priority.
+ if( threadQueuedRunnables.get()!=0 ) {
+ continue outer;
+ }
}
}
- }
- if( counter!=0 ) {
- // don't wait for wake up until we could find
- // no runnables to dispatch.
- continue;
- }
-
+ if( counter!=0 ) {
+ // don't wait for wake up until we could find
+ // no runnables to dispatch.
+ continue;
+ }
+
// GlobalDispatchQueue[] globalQueues = spi.globalQueues;
// while( true ) {
//
@@ -93,14 +94,20 @@
// continue;
// }
//
- try {
- waitForWakeup();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return;
+ try {
+ waitForWakeup();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
}
+ } catch (Shutdown e) {
}
}
+
+ @SuppressWarnings("serial")
+ static class Shutdown extends RuntimeException {
+ }
private boolean dispatch(SimpleQueue queue) {
int counter=0;
@@ -122,6 +129,8 @@
private void dispatch(Runnable runnable) {
try {
runnable.run();
+ } catch (Shutdown e) {
+ throw e;
} catch (Throwable e) {
e.printStackTrace();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSPI.java Fri Dec 4 19:48:21 2009
@@ -22,9 +22,9 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSPI;
import org.apache.activemq.dispatch.DispatchSource;
import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
@@ -35,7 +35,7 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class SimpleDispatchSPI extends DispatchSPI {
+public class SimpleDispatchSPI implements DispatchSPI {
final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
final GlobalDispatchQueue globalQueues[];
@@ -44,20 +44,14 @@
final ConcurrentLinkedQueue<DispatcherThread> waitingDispatchers = new ConcurrentLinkedQueue<DispatcherThread>();
final AtomicInteger waitingDispatcherCount = new AtomicInteger();
-
+ final AtomicInteger startCounter = new AtomicInteger();
public SimpleDispatchSPI(int size) {
globalQueues = new GlobalDispatchQueue[3];
for (int i = 0; i < 3; i++) {
globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i] );
}
-
dispatchers = new DispatcherThread[size];
- for (int i = 0; i < size; i++) {
- dispatchers[i] = new DispatcherThread(this, i);
- dispatchers[i].start();
-
- }
}
public DispatchQueue getMainQueue() {
@@ -98,5 +92,32 @@
}
}
+ public void start() {
+ if( startCounter.getAndIncrement()==0 ) {
+ for (int i = 0; i < dispatchers.length; i++) {
+ dispatchers[i] = new DispatcherThread(this, i);
+ dispatchers[i].start();
+ }
+ }
+ }
+
+ public void shutdown(final Runnable onShutdown) {
+ if( startCounter.decrementAndGet()==0 ) {
+
+ final AtomicInteger shutdownCountDown = new AtomicInteger(dispatchers.length);
+ for (int i = 0; i < dispatchers.length; i++) {
+ ThreadDispatchQueue queue = dispatchers[i].threadQueues[LOW.ordinal()];
+ queue.runnables.add(new Runnable() {
+ public void run() {
+ if( shutdownCountDown.decrementAndGet()==0 && onShutdown!=null) {
+ onShutdown.run();
+ }
+ throw new DispatcherThread.Shutdown();
+ }
+ });
+ }
+ }
+ }
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec 4 19:48:21 2009
@@ -18,8 +18,7 @@
import java.util.concurrent.CountDownLatch;
-import org.apache.activemq.dispatch.DispatchSystem.DispatchSPI;
-import org.apache.activemq.dispatch.internal.advanced.AdancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSPI;
import static java.lang.String.*;
@@ -30,15 +29,35 @@
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class DispatchSystemTest {
+
+ public static class RunnableCountDownLatch extends CountDownLatch implements Runnable {
+ public RunnableCountDownLatch(int count) {
+ super(count);
+ }
+ public void run() {
+ countDown();
+ }
+ }
public static void main(String[] args) throws Exception {
- DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
+ DispatchSPI advancedSystem = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
+ advancedSystem.start();
benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
+ RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
+ advancedSystem.shutdown(latch);
+ latch.await();
+
DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
+ simpleSystem.start();
+
benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
benchmark("simple private serial queue", simpleSystem, simpleSystem.createQueue("test"));
+
+ latch = new RunnableCountDownLatch(1);
+ simpleSystem.shutdown(latch);
+ latch.await();
}
private static void benchmark(String name, DispatchSPI spi, DispatchQueue queue) throws InterruptedException {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Fri Dec 4 19:48:21 2009
@@ -28,7 +28,7 @@
public class DispatcherPoolTest {
public static void main(String[] args) throws Exception {
- DispatcherPool pooledDispatcher = new DispatcherPool("default", Runtime.getRuntime().availableProcessors(), 3);
+ AdvancedDispatchSPI pooledDispatcher = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), 3);
pooledDispatcher.start();
// warm the JIT up..
@@ -46,7 +46,7 @@
System.out.println(format("duration: %,.3f ms, rate: %,.2f executions/sec", durationMS, rate));
}
- private static void benchmarkWork(final DispatcherPool pooledDispatcher, int iterations) throws InterruptedException {
+ private static void benchmarkWork(final AdvancedDispatchSPI pooledDispatcher, int iterations) throws InterruptedException {
final CountDownLatch counter = new CountDownLatch(iterations);
for (int i = 0; i < 1000; i++) {
Work dispatchable = new Work(counter, pooledDispatcher);
@@ -59,9 +59,9 @@
private final CountDownLatch counter;
private final DispatchContext context;
- private Work(CountDownLatch counter, DispatcherPool pooledDispatcher) {
+ private Work(CountDownLatch counter, AdvancedDispatchSPI spi) {
this.counter = counter;
- this.context = pooledDispatcher.register(this , "test");
+ this.context = spi.register(this , "test");
}
public void run() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/DispatcherXml.java Fri Dec 4 19:48:21 2009
@@ -22,7 +22,8 @@
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.activemq.apollo.broker.Broker;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
@XmlRootElement(name="dispatcher")
@@ -36,12 +37,12 @@
@XmlAttribute(required=false)
int threads = Runtime.getRuntime().availableProcessors();
- public Dispatcher createDispatcher(BrokerXml brokerXml) {
+ public AdvancedDispatchSPI createDispatcher(BrokerXml brokerXml) {
if( name == null ) {
// VirtualHostXml vh = brokerXml.getDefaultVirtualHost();
name = "Broker: ";
}
- return DispatcherThread.createPriorityDispatchPool(name, maxPriority, threads);
+ return new AdvancedDispatchSPI(threads, maxPriority);
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Fri Dec 4 19:48:21 2009
@@ -26,7 +26,7 @@
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.BrokerFactory;
import org.apache.activemq.broker.store.memory.MemoryStore;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherPool;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
@@ -44,9 +44,9 @@
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
Broker broker = BrokerFactory.createBroker(uri);
- DispatcherPool p = (DispatcherPool)broker.getDispatcher();
- assertEquals(4, p.getSize());
- assertEquals("test dispatcher", p.getName());
+ AdvancedDispatchSPI p = (AdvancedDispatchSPI)broker.getDispatcher();
+// assertEquals(4, p.getSize());
+// assertEquals("test dispatcher", p.getName());
assertEquals(1, broker.getTransportServers().size());
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Dec 4 19:48:21 2009
@@ -39,9 +39,8 @@
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
@@ -65,7 +64,7 @@
private static int PERFORMANCE_SAMPLES = 5;
- Dispatcher dispatcher;
+ AdvancedDispatchSPI dispatcher;
BrokerDatabase database;
BrokerQueueStore queueStore;
private static final boolean USE_KAHA_DB = true;
@@ -82,12 +81,8 @@
protected ArrayList<Producer> producers = new ArrayList<Producer>();
protected ArrayList<IQueue<Long, MessageDelivery>> queues = new ArrayList<IQueue<Long, MessageDelivery>>();
- protected Dispatcher createDispatcher() {
- if (THREAD_POOL_SIZE > 1) {
- return DispatcherThread.createPriorityDispatchPool("TestDispatcher", Broker.MAX_PRIORITY, THREAD_POOL_SIZE);
- } else {
- return DispatcherThread.createPriorityDispatcher("TestDispatcher", Broker.MAX_PRIORITY);
- }
+ protected AdvancedDispatchSPI createDispatcher() {
+ return new AdvancedDispatchSPI(THREAD_POOL_SIZE, Broker.MAX_PRIORITY);
}
protected int consumerStartDelay = 0;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Fri Dec 4 19:48:21 2009
@@ -20,7 +20,7 @@
import java.util.Collection;
import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.flow.ISinkController.FlowControllable;
/**
@@ -31,7 +31,7 @@
*/
public abstract class AbstractFlowQueue<E> extends AbstractFlowRelay<E> implements FlowControllable<E>, IFlowQueue<E> {
- protected Dispatcher dispatcher;
+ protected AdvancedDispatchSPI dispatcher;
protected DispatchContext dispatchContext;
protected Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners;
private boolean notifyReady = false;
@@ -132,7 +132,7 @@
* @param dispatcher
* The dispatcher to handle messages.
*/
- public synchronized void setDispatcher(Dispatcher dispatcher) {
+ public synchronized void setDispatcher(AdvancedDispatchSPI dispatcher) {
this.dispatcher = dispatcher;
dispatchContext = dispatcher.register(new Runnable(){
public void run() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IFlowQueue.java Fri Dec 4 19:48:21 2009
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.queue;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.flow.IFlowRelay;
public interface IFlowQueue<E> extends IBlockingFlowSource<E>, IPollableFlowSource<E>, IFlowRelay<E> {
@@ -57,7 +57,7 @@
* @param dispatcher
* The dispatcher to be used by the queue.
*/
- public void setDispatcher(Dispatcher dispatcher);
+ public void setDispatcher(AdvancedDispatchSPI dispatcher);
/**
* Sets the base dispatch priority for the queue. Setting to higher value
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Fri Dec 4 19:48:21 2009
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.queue;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.queue.QueueStore.PersistentQueue;
import org.apache.activemq.util.Mapper;
@@ -47,7 +47,7 @@
* @param dispatcher
* The dispatcher to be used by the queue.
*/
- public void setDispatcher(Dispatcher dispatcher);
+ public void setDispatcher(AdvancedDispatchSPI dispatcher);
/**
* Sets the base dispatch priority for the queue. Setting to higher value
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Fri Dec 4 19:48:21 2009
@@ -20,7 +20,7 @@
import java.util.HashMap;
import java.util.HashSet;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -30,7 +30,7 @@
protected HashSet<Subscription<V>> subscriptions = new HashSet<Subscription<V>>();
private HashMap<Integer, IQueue<K, V>> partitions = new HashMap<Integer, IQueue<K, V>>();
protected QueueStore<K, V> store;
- protected Dispatcher dispatcher;
+ protected AdvancedDispatchSPI dispatcher;
protected boolean started;
protected boolean shutdown = false;
protected QueueDescriptor queueDescriptor;
@@ -239,7 +239,7 @@
this.autoRelease = autoRelease;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
checkShutdown();
this.dispatcher = dispatcher;
synchronized (this) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/AbstractTestConnection.java Fri Dec 4 19:48:21 2009
@@ -10,7 +10,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.flow.AbstractLimiter;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.IFlowLimiter;
@@ -65,12 +65,12 @@
private final int inputWindowSize = 1000;
private final int inputResumeThreshold = 500;
- private Dispatcher dispatcher;
+ private AdvancedDispatchSPI dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean(false);
protected boolean blockingTransport = false;
ExecutorService blockingWriter;
- public static void setInShutdown(boolean val, Dispatcher dispatcher) {
+ public static void setInShutdown(boolean val, AdvancedDispatchSPI dispatcher) {
if (val != inShutdown.getAndSet(val)) {
if (val) {
if (USE_RATE_BASED_LIMITER) {
@@ -274,11 +274,11 @@
this.priorityLevels = priorityLevels;
}
- public Dispatcher getDispatcher() {
+ public AdvancedDispatchSPI getDispatcher() {
return dispatcher;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
this.dispatcher = dispatcher;
}
@@ -455,12 +455,12 @@
protected static class RateBasedLimiterCollector implements Runnable {
- private Dispatcher dispatcher;
+ private AdvancedDispatchSPI dispatcher;
private int samplingPeriod = 50;
private boolean scheduled = false;
private HashSet<RateBasedLimiter> limiters = new HashSet<RateBasedLimiter>();
- public synchronized void setDispatcher(Dispatcher d) {
+ public synchronized void setDispatcher(AdvancedDispatchSPI d) {
if (d != dispatcher) {
scheduled = false;
dispatcher = d;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java Fri Dec 4 19:48:21 2009
@@ -21,8 +21,9 @@
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.DispatcherAware;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.Commands.Destination;
@@ -53,7 +54,7 @@
private TransportServer transportServer;
private String uri;
private String name;
- protected Dispatcher dispatcher;
+ protected AdvancedDispatchSPI dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
private boolean useInputQueues = false;
@@ -159,7 +160,7 @@
error.printStackTrace();
}
- public Dispatcher getDispatcher() {
+ public AdvancedDispatchSPI getDispatcher() {
return dispatcher;
}
@@ -167,7 +168,7 @@
this.name = name;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
this.dispatcher = dispatcher;
}
@@ -189,7 +190,7 @@
protected void createDispatcher() {
if (dispatcher == null) {
- dispatcher = DispatcherThread.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, Runtime.getRuntime().availableProcessors());
+ dispatcher = new AdvancedDispatchSPI(Runtime.getRuntime().availableProcessors(), Message.MAX_PRIORITY);
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java Fri Dec 4 19:48:21 2009
@@ -20,7 +20,8 @@
import junit.framework.TestCase;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.Commands.Destination.DestinationBean;
@@ -63,7 +64,7 @@
protected MockBroker rcvBroker;
protected MockClient client;
- protected Dispatcher dispatcher;
+ protected AdvancedDispatchSPI dispatcher;
static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
public Long map(Message element) {
@@ -94,8 +95,8 @@
}
}
- protected Dispatcher createDispatcher(String name) {
- return DispatcherThread.createPriorityDispatchPool(name, Message.MAX_PRIORITY, threadsPerDispatcher);
+ protected AdvancedDispatchSPI createDispatcher(String name) {
+ return new AdvancedDispatchSPI(threadsPerDispatcher, Message.MAX_PRIORITY);
}
public void test_1_1_0() throws Exception {
@@ -284,7 +285,7 @@
}
}
- Dispatcher clientDispatcher = null;
+ AdvancedDispatchSPI clientDispatcher = null;
if (SEPARATE_CLIENT_DISPATCHER) {
clientDispatcher = createDispatcher("ClientDispatcher");
clientDispatcher.start();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java Fri Dec 4 19:48:21 2009
@@ -7,7 +7,8 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.Commands.Destination.DestinationBean;
@@ -43,7 +44,7 @@
protected ArrayList<MetricCounter> additionalReportMetrics = new ArrayList<MetricCounter>();
protected boolean includeDetailedRates = false;
- protected Dispatcher dispatcher;
+ protected AdvancedDispatchSPI dispatcher;
public RemoteConsumer consumer(int index) {
return consumers.get(index);
@@ -214,7 +215,7 @@
return testName;
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
this.dispatcher = dispatcher;
}
@@ -274,13 +275,13 @@
}
}
- public Dispatcher getDispatcher() {
+ public AdvancedDispatchSPI getDispatcher() {
return dispatcher;
}
- protected Dispatcher createDispatcher() {
+ protected AdvancedDispatchSPI createDispatcher() {
if (dispatcher == null) {
- dispatcher = DispatcherThread.createPriorityDispatchPool("ClientDispatcher", numPriorities, threadsPerDispatcher);
+ dispatcher = new AdvancedDispatchSPI(threadsPerDispatcher, numPriorities);
}
return dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java Fri Dec 4 19:48:21 2009
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.transport;
-import org.apache.activemq.dispatch.internal.advanced.DispatcherAware;
+import org.apache.activemq.dispatch.DispatcherAware;
public interface DispatchableTransport extends Transport, DispatcherAware {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887333&r1=887332&r2=887333&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Fri Dec 4 19:48:21 2009
@@ -14,7 +14,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatchSPI;
import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
@@ -75,7 +75,7 @@
}
}
- public void setDispatcher(Dispatcher dispatcher) {
+ public void setDispatcher(AdvancedDispatchSPI dispatcher) {
readContext = dispatcher.register(new Runnable() {
public void run() {
dispatch();