You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/04 16:32:13 UTC
svn commit: r887228 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/
activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/i...
Author: chirino
Date: Fri Dec 4 15:32:12 2009
New Revision: 887228
URL: http://svn.apache.org/viewvc?rev=887228&view=rev
Log:
More refactoring in efforts of eventaully unifying the adanced package model with the dispatch package model.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Fri Dec 4 15:32:12 2009
@@ -6,8 +6,8 @@
import org.apache.activemq.apollo.Connection;
import org.apache.activemq.apollo.broker.Destination;
import org.apache.activemq.apollo.broker.MessageDelivery;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
import org.apache.activemq.flow.IFlowController;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISinkController;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Fri Dec 4 15:32:12 2009
@@ -29,4 +29,5 @@
public void setFinalizer(Runnable finalizer);
public void setTargetQueue(DispatchQueue queue);
+ public DispatchQueue getTargetQueue();
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObserver.java Fri Dec 4 15:32:12 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatchContext;
+
+public interface DispatchObserver {
+
+ /**
+ * Should be called when a {@link DispatchContext#requestDispatch()} is called.
+ * This assists the load balancer in determining relationships between {@link DispatchContext}s
+ * @param caller The calling dispatcher
+ * @param context The context from which the dispatch is requested.
+ */
+ public void onDispatch(Dispatcher caller, PooledDispatchContext context);
+
+ /**
+ * Must be called by the dispatcher when a {@link DispatchContext} is closed.
+ */
+ public void close();
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Fri Dec 4 15:32:12 2009
@@ -45,29 +45,38 @@
return CURRENT_QUEUE.get();
}
- private final static DispatchSPI spi = cretateDispatchSystemSPI();
+ private static DispatchSPI spi;
+
private static DispatchSPI cretateDispatchSystemSPI() {
return new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
}
+ synchronized private static DispatchSPI spi() {
+ if(spi==null) {
+ spi = cretateDispatchSystemSPI();
+ }
+ return spi;
+ }
+
static DispatchQueue getMainQueue() {
- return spi.getMainQueue();
+ return spi().getMainQueue();
}
static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
- return spi.getGlobalQueue(priority);
+ return spi().getGlobalQueue(priority);
}
static DispatchQueue createQueue(String label) {
- return spi.createQueue(label);
+ return spi().createQueue(label);
}
static void dispatchMain() {
- spi.dispatchMain();
+ spi().dispatchMain();
}
static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
- return spi.createSource(channel, interestOps, queue);
+ return spi().createSource(channel, interestOps, queue);
}
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractDispatchObject.java Fri Dec 4 15:32:12 2009
@@ -46,5 +46,7 @@
this.targetQueue = targetQueue;
}
-
+ public DispatchQueue getTargetQueue() {
+ return this.targetQueue;
+ }
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispachableAdapter.java Fri Dec 4 15:32:12 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.advanced;
+
+
+public class DispachableAdapter implements Runnable {
+
+ public static final int MAX_DISPATCH_PER_LOOP = 20;
+
+ private Dispatchable dispatchable;
+
+ private final DispatcherPool pool;
+
+ public DispachableAdapter(DispatcherPool pool, Dispatchable dispatchable) {
+ this.pool = pool;
+ this.dispatchable = dispatchable;
+ }
+
+ public void run() {
+ int processed=0;
+ while( true ) {
+ if( dispatchable.dispatch() ) {
+ break;
+ }
+ processed++;
+ if( processed > MAX_DISPATCH_PER_LOOP ) {
+ // Still not done.. so we re-enqueue the dispatch request
+ // and exit the current run loop.
+ pool.getCurrentDispatchContext().requestDispatch();
+ break;
+ }
+ }
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatchContext.java Fri Dec 4 15:32:12 2009
@@ -0,0 +1,44 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.RejectedExecutionException;
+
+
+/**
+ * Returned to callers registered with this dispathcer. Used by the caller
+ * to inform the dispatcher that it is ready for dispatch.
+ *
+ * Note that DispatchContext is not safe for concurrent access by multiple
+ * threads.
+ */
+public interface DispatchContext {
+ /**
+ * Once registered with a dispatcher, this can be called to request
+ * dispatch. The {@link Dispatchable} will remain in the dispatch queue
+ * until a subsequent call to {@link Dispatchable#dispatch()} returns
+ * false;
+ *
+ * @throws RejectedExecutionException If the dispatcher has been shutdown.
+ */
+ public void requestDispatch() throws RejectedExecutionException;
+
+ /**
+ * This can be called to update the dispatch priority.
+ *
+ * @param priority
+ */
+ public void updatePriority(int priority);
+
+ /**
+ * Gets the name of the dispatch context
+ *
+ * @return The dispatchable
+ */
+ public String getName();
+
+ /**
+ * This must be called to release any resource the dispatcher is holding
+ * on behalf of this context. Once called this {@link DispatchContext} should
+ * no longer be used.
+ */
+ public void close(boolean sync);
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatchable.java Fri Dec 4 15:32:12 2009
@@ -0,0 +1,15 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+/**
+ * This interface is implemented by Dispatchable entities. A Dispatchable
+ * entity registers with an {@link Dispatcher} and is returned a
+ * {@link DispatchContext} which it can use to request the
+ * {@link Dispatcher} to invoke {@link Dispatchable#dispatch()}
+ *
+ * {@link Dispatcher} guarantees that {@link #dispatch()} will never invoke
+ * dispatch concurrently unless the {@link Dispatchable} is registered with
+ * more than one {@link Dispatcher};
+ */
+public interface Dispatchable {
+ public boolean dispatch();
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/Dispatcher.java Fri Dec 4 15:32:12 2009
@@ -17,92 +17,11 @@
package org.apache.activemq.dispatch.internal.advanced;
import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
public interface Dispatcher extends Executor {
/**
- * This interface is implemented by Dispatchable entities. A Dispatchable
- * entity registers with an {@link Dispatcher} and is returned a
- * {@link DispatchContext} which it can use to request the
- * {@link Dispatcher} to invoke {@link Dispatchable#dispatch()}
- *
- * {@link Dispatcher} guarantees that {@link #dispatch()} will never invoke
- * dispatch concurrently unless the {@link Dispatchable} is registered with
- * more than one {@link Dispatcher};
- */
- public interface Dispatchable {
- public boolean dispatch();
- }
-
- /**
- * Returned to callers registered with this dispathcer. Used by the caller
- * to inform the dispatcher that it is ready for dispatch.
- *
- * Note that DispatchContext is not safe for concurrent access by multiple
- * threads.
- */
- public interface DispatchContext {
- /**
- * Once registered with a dispatcher, this can be called to request
- * dispatch. The {@link Dispatchable} will remain in the dispatch queue
- * until a subsequent call to {@link Dispatchable#dispatch()} returns
- * false;
- *
- * @throws RejectedExecutionException If the dispatcher has been shutdown.
- */
- public void requestDispatch() throws RejectedExecutionException;
-
- /**
- * This can be called to update the dispatch priority.
- *
- * @param priority
- */
- public void updatePriority(int priority);
-
- /**
- * Gets the Dispatchable that this context represents.
- *
- * @return The dispatchable
- */
- public Dispatchable getDispatchable();
-
- /**
- * Gets the name of the dispatch context
- *
- * @return The dispatchable
- */
- public String getName();
-
- /**
- * This must be called to release any resource the dispatcher is holding
- * on behalf of this context. Once called this {@link DispatchContext} should
- * no longer be used.
- */
- public void close(boolean sync);
- }
-
- public class RunnableAdapter implements Dispatchable, Runnable {
- private Runnable runnable;
-
- public RunnableAdapter() {
- runnable = this;
- }
- public RunnableAdapter(Runnable runnable) {
- this.runnable = runnable;
- }
-
- public boolean dispatch() {
- runnable.run();
- return true;
- }
-
- public void run() {
- }
- }
-
- /**
* Registers a {@link Dispatchable} with this dispatcher, and returns a
* {@link DispatchContext} that the caller can use to request dispatch.
*
@@ -113,6 +32,8 @@
* @return A {@link DispatchContext} that can be used to request dispatch
*/
public DispatchContext register(Dispatchable dispatchable, String name);
+
+ public DispatchContext register(Runnable runnable, String name);
/**
* Gets the number of dispatch priorities. Dispatch priorities are 0 based,
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPool.java Fri Dec 4 15:32:12 2009
@@ -167,6 +167,10 @@
return chooseDispatcher().register(dispatchable, name);
}
+ public DispatchContext register(Runnable runnable, String name) {
+ return chooseDispatcher().register(runnable, name);
+ }
+
public String toString() {
return name;
}
@@ -182,7 +186,7 @@
public final Executor createPriorityExecutor(final int priority) {
return new Executor() {
public void execute(final Runnable runnable) {
- chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+ chooseDispatcher().dispatch(runnable, priority);
}
};
@@ -194,11 +198,11 @@
}
public void execute(Runnable command) {
- chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
+ chooseDispatcher().dispatch(command, 0);
}
public void execute(Runnable command, int priority) {
- chooseDispatcher().dispatch(new RunnableAdapter(command), priority);
+ chooseDispatcher().dispatch(command, priority);
}
public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherThread.java Fri Dec 4 15:32:12 2009
@@ -25,9 +25,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.dispatch.DispatchObserver;
import org.apache.activemq.dispatch.DispatchSystem;
import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
-import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.PriorityLinkedList;
import org.apache.activemq.util.TimerHeap;
@@ -46,7 +46,7 @@
protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
// Set if this dispatcher is part of a dispatch pool:
- protected final DispatcherPool pooledDispatcher;
+ protected final DispatcherPool dispatcherPool;
// The local dispatch queue:
protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
@@ -88,7 +88,7 @@
for (int i = 0; i < 2; i++) {
foreignQueue[i] = new LinkedNodeList<ForeignEvent>();
}
- this.pooledDispatcher = pooledDispactcher;
+ this.dispatcherPool = pooledDispactcher;
}
public static final Dispatcher createPriorityDispatcher(String name, int numPriorities) {
@@ -145,7 +145,11 @@
}
public DispatchContext register(Dispatchable dispatchable, String name) {
- return new PriorityDispatchContext(dispatchable, true, name);
+ return register(new DispachableAdapter(dispatcherPool, dispatchable), name);
+ }
+
+ public DispatchContext register(Runnable runnable, String name) {
+ return new PriorityDispatchContext(runnable, true, name);
}
/*
@@ -170,7 +174,7 @@
Thread joinThread = null;
synchronized (this) {
if (thread != null) {
- dispatchInternal(new RunnableAdapter() {
+ dispatchInternal(new Runnable() {
public void run() {
running = false;
}
@@ -200,52 +204,37 @@
public void run() {
- if (pooledDispatcher != null) {
+ if (dispatcherPool != null) {
// Inform the dispatcher that we have started:
- pooledDispatcher.onDispatcherStarted((DispatcherThread) this);
+ dispatcherPool.onDispatcherStarted((DispatcherThread) this);
}
PriorityDispatchContext pdc;
try {
- final int MAX_DISPATCH_PER_LOOP = 20;
- int processed = 0;
-
while (running) {
- pdc = priorityQueue.poll();
+ int counter = 0;
// If no local work available wait for foreign work:
- if (pdc == null) {
- waitForEvents();
- } else {
+ while((pdc = priorityQueue.poll())!=null){
if( pdc.priority < dispatchQueues.length ) {
DispatchSystem.CURRENT_QUEUE.set(dispatchQueues[pdc.priority]);
}
if (pdc.tracker != null) {
- pooledDispatcher.setCurrentDispatchContext(pdc);
+ dispatcherPool.setCurrentDispatchContext(pdc);
}
- while (!pdc.dispatch()) {
- processed++;
- if (processed > MAX_DISPATCH_PER_LOOP || pdc.listPrio < priorityQueue.getHighestPriority()) {
- // Give other dispatchables a shot:
- // May have gotten relinked by the caller:
- if (!pdc.isLinked()) {
- priorityQueue.add(pdc, pdc.listPrio);
- }
- break;
- }
- }
+ counter++;
+ pdc.run();
if (pdc.tracker != null) {
- pooledDispatcher.setCurrentDispatchContext(null);
+ dispatcherPool.setCurrentDispatchContext(null);
}
+ }
- if (processed < MAX_DISPATCH_PER_LOOP) {
- continue;
- }
+ if( counter==0 ) {
+ waitForEvents();
}
- processed = 0;
// Execute delayed events:
timerHeap.executeReadyTimers();
@@ -271,7 +260,6 @@
fe.unlink();
fe.execute();
}
-
}
}
} catch (InterruptedException e) {
@@ -279,8 +267,8 @@
} catch (Throwable thrown) {
thrown.printStackTrace();
} finally {
- if (pooledDispatcher != null) {
- pooledDispatcher.onDispatcherStopped((DispatcherThread) this);
+ if (dispatcherPool != null) {
+ dispatcherPool.onDispatcherStopped((DispatcherThread) this);
}
cleanup();
}
@@ -362,8 +350,8 @@
}
//Special dispatch method that allow high priority dispatch:
- private final void dispatchInternal(Dispatchable dispatchable, int priority) {
- PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+ private final void dispatchInternal(Runnable runnable, int priority) {
+ PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
context.priority = priority;
context.requestDispatch();
}
@@ -375,8 +363,8 @@
* org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
* .dispatch.Dispatcher.Dispatchable)
*/
- public final void dispatch(Dispatchable dispatchable, int priority) {
- PriorityDispatchContext context = new PriorityDispatchContext(dispatchable, false, name);
+ public final void dispatch(Runnable runnable, int priority) {
+ PriorityDispatchContext context = new PriorityDispatchContext(runnable, false, name);
context.updatePriority(priority);
context.requestDispatch();
}
@@ -391,17 +379,17 @@
return new Executor() {
public void execute(final Runnable runnable) {
- dispatch(new RunnableAdapter(runnable), priority);
+ dispatch(runnable, priority);
}
};
}
public void execute(final Runnable runnable) {
- dispatch(new RunnableAdapter(runnable), 0);
+ dispatch(runnable, 0);
}
public void execute(final Runnable runnable, int prio) {
- dispatch(new RunnableAdapter(runnable), prio);
+ dispatch(runnable, prio);
}
/*
@@ -437,8 +425,8 @@
}
private final DispatcherThread getCurrentDispatcher() {
- if (pooledDispatcher != null) {
- return (DispatcherThread) pooledDispatcher.getCurrentDispatcher();
+ if (dispatcherPool != null) {
+ return (DispatcherThread) dispatcherPool.getCurrentDispatcher();
} else if (Thread.currentThread() == thread) {
return (DispatcherThread) this;
} else {
@@ -448,15 +436,15 @@
}
private final PooledDispatchContext getCurrentDispatchContext() {
- return pooledDispatcher.getCurrentDispatchContext();
+ return dispatcherPool.getCurrentDispatchContext();
}
/**
*
*/
protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext {
- // The dispatchable target:
- private final Dispatchable dispatchable;
+ // The target:
+ private final Runnable runnable;
// The name of this context:
final String name;
// list prio can only be updated in the thread of of the owning
@@ -467,7 +455,7 @@
// from foreign threads:
final UpdateEvent updateEvent[];
- private final ExecutionTracker tracker;
+ private final DispatchObserver tracker;
protected DispatcherThread currentOwner;
private DispatcherThread updateDispatcher = null;
@@ -476,12 +464,12 @@
private boolean closed = false;
final CountDownLatch closeLatch = new CountDownLatch(1);
- protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
- this.dispatchable = dispatchable;
+ protected PriorityDispatchContext(Runnable runnable, boolean persistent, String name) {
+ this.runnable = runnable;
this.name = name;
this.currentOwner = (DispatcherThread) DispatcherThread.this;
- if (persistent && pooledDispatcher != null) {
- this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
+ if (persistent && dispatcherPool != null) {
+ this.tracker = dispatcherPool.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
} else {
this.tracker = null;
}
@@ -502,7 +490,7 @@
*
* @return the execution tracker for the context:
*/
- public ExecutionTracker getExecutionTracker() {
+ public DispatchObserver getExecutionTracker() {
return tracker;
}
@@ -511,11 +499,11 @@
*
* @return False if the dispatchable has more work to do.
*/
- public final boolean dispatch() {
- return dispatchable.dispatch();
+ public final void run() {
+ runnable.run();
}
- public final void assignToNewDispatcher(Dispatcher newDispatcher) {
+ public final void setTargetQueue(Dispatcher newDispatcher) {
synchronized (this) {
// If we're already set to this dispatcher
@@ -538,7 +526,7 @@
DispatcherThread callingDispatcher = getCurrentDispatcher();
if (tracker != null)
- tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
+ tracker.onDispatch(callingDispatcher, getCurrentDispatchContext());
// Otherwise this is coming off another thread, so we need to
// synchronize
@@ -691,11 +679,7 @@
return getName();
}
- public Dispatchable getDispatchable() {
- return dispatchable;
- }
-
- public DispatcherThread getDispatcher() {
+ public DispatcherThread getTargetQueue() {
return currentOwner;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Fri Dec 4 15:32:12 2009
@@ -84,4 +84,7 @@
throw new UnsupportedOperationException();
}
+ public DispatchQueue getTargetQueue() {
+ throw new UnsupportedOperationException();
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/LoadBalancer.java Fri Dec 4 15:32:12 2009
@@ -16,27 +16,12 @@
*/
package org.apache.activemq.dispatch.internal.advanced;
+import org.apache.activemq.dispatch.DispatchObserver;
+
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
public interface LoadBalancer {
- public interface ExecutionTracker {
-
- /**
- * Should be called when a {@link DispatchContext#requestDispatch()} is called.
- * This assists the load balancer in determining relationships between {@link DispatchContext}s
- * @param caller The calling dispatcher
- * @param context The context from which the dispatch is requested.
- */
- public void onDispatchRequest(Dispatcher caller, PooledDispatchContext context);
-
- /**
- * Must be called by the dispatcher when a {@link DispatchContext} is closed.
- */
- public void close();
- }
-
/**
* Must be called by a dispatch thread when it starts
* @param dispatcher The dispatcher
@@ -50,11 +35,11 @@
public void onDispatcherStopped(Dispatcher dispatcher);
/**
- * Gets an {@link ExecutionTracker} for the dispatch context.
+ * Gets an {@link DispatchObserver} for the dispatch context.
* @param context
* @return
*/
- public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
+ public DispatchObserver createExecutionTracker(PooledDispatchContext context);
/**
* Starts execution tracking
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatchContext.java Fri Dec 4 15:32:12 2009
@@ -1,7 +1,6 @@
package org.apache.activemq.dispatch.internal.advanced;
-import org.apache.activemq.dispatch.internal.advanced.LoadBalancer.ExecutionTracker;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
+import org.apache.activemq.dispatch.DispatchObserver;
/**
* A {@link PooledDispatchContext}s can be moved between different
@@ -12,7 +11,7 @@
* Called to transfer a {@link PooledDispatchContext} to a new
* Dispatcher.
*/
- public void assignToNewDispatcher(Dispatcher newDispatcher);
+ public void setTargetQueue(Dispatcher newDispatcher);
/**
* Gets the dispatcher to which this PooledDispatchContext currently
@@ -20,12 +19,12 @@
*
* @return
*/
- public Dispatcher getDispatcher();
+ public Dispatcher getTargetQueue();
/**
* Gets the execution tracker for the context.
*
* @return the execution tracker for the context:
*/
- public ExecutionTracker getExecutionTracker();
+ public DispatchObserver getExecutionTracker();
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Fri Dec 4 15:32:12 2009
@@ -22,6 +22,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.dispatch.DispatchObserver;
+
public class SimpleLoadBalancer implements LoadBalancer {
@@ -97,7 +99,7 @@
dispatchers.remove(dispatcher);
}
- public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
+ public DispatchObserver createExecutionTracker(PooledDispatchContext context) {
return new SimpleExecutionTracker(context);
}
@@ -116,7 +118,7 @@
}
}
- private class SimpleExecutionTracker implements ExecutionTracker {
+ private class SimpleExecutionTracker implements DispatchObserver {
private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
private final PooledDispatchContext context;
private final AtomicInteger work = new AtomicInteger(0);
@@ -126,7 +128,7 @@
SimpleExecutionTracker(PooledDispatchContext context) {
this.context = context;
- currentOwner = context.getDispatcher();
+ currentOwner = context.getTargetQueue();
}
/**
@@ -143,7 +145,7 @@
* @return True if this method resulted in the dispatch request being
* assigned to another dispatcher.
*/
- public void onDispatchRequest(Dispatcher callingDispatcher, PooledDispatchContext callingContext) {
+ public void onDispatch(Dispatcher callingDispatcher, PooledDispatchContext callingContext) {
if (callingContext != null) {
// Make sure we are being called by another node:
@@ -166,7 +168,7 @@
System.out.println("Assigning: " + context + " to " + callingContext + "'s dispatcher: " + callingDispatcher + " From: " + currentOwner);
currentOwner = callingDispatcher;
- context.assignToNewDispatcher(callingDispatcher);
+ context.setTargetQueue(callingDispatcher);
}
} else {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Fri Dec 4 15:32:12 2009
@@ -82,4 +82,8 @@
throw new UnsupportedOperationException();
}
+ public DispatchQueue getTargetQueue() {
+ throw new UnsupportedOperationException();
+ }
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Fri Dec 4 15:32:12 2009
@@ -27,6 +27,7 @@
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
final public class DispatcherThread extends Thread {
+ private static final int MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO = 10000;
private final SimpleDispatchSPI spi;
private final ThreadDispatchQueue[] threadQueues;
final AtomicLong threadQueuedRunnables = new AtomicLong();
@@ -78,6 +79,20 @@
// no runnables to dispatch.
continue;
}
+
+// GlobalDispatchQueue[] globalQueues = spi.globalQueues;
+// while( true ) {
+//
+// if( dispatch(threadQueues[0])
+// || dispatch(globalQueues[0])
+// || dispatch(threadQueues[1])
+// || dispatch(globalQueues[1])
+// || dispatch(threadQueues[2])
+// || dispatch(globalQueues[2])
+// ) {
+// continue;
+// }
+//
try {
waitForWakeup();
} catch (InterruptedException e) {
@@ -87,6 +102,23 @@
}
}
+ private boolean dispatch(SimpleQueue queue) {
+ int counter=0;
+ Runnable runnable;
+ while( counter < MAX_DISPATCH_BEFORE_CHECKING_FOR_HIGHER_PRIO ) {
+ runnable = queue.poll();
+ if( runnable == null ) {
+ break;
+ }
+ if( counter==0 ) {
+ DispatchSystem.CURRENT_QUEUE.set(queue);
+ }
+ dispatch(runnable);
+ counter++;
+ }
+ return counter!=0;
+ }
+
private void dispatch(Runnable runnable) {
try {
runnable.run();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Fri Dec 4 15:32:12 2009
@@ -87,6 +87,10 @@
public void setTargetQueue(DispatchQueue queue) {
throw new UnsupportedOperationException();
}
+
+ public DispatchQueue getTargetQueue() {
+ throw new UnsupportedOperationException();
+ }
public Runnable poll() {
Runnable rc = runnables.poll();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Fri Dec 4 15:32:12 2009
@@ -114,6 +114,10 @@
public void setTargetQueue(DispatchQueue queue) {
throw new UnsupportedOperationException();
}
+ public DispatchQueue getTargetQueue() {
+ throw new UnsupportedOperationException();
+ }
+
public DispatchQueuePriority getPriority() {
return priority;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Fri Dec 4 15:32:12 2009
@@ -32,9 +32,9 @@
public class DispatchSystemTest {
public static void main(String[] args) throws Exception {
-// DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
-// benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
-// benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
+ DispatchSPI advancedSystem = new AdancedDispatchSPI(Runtime.getRuntime().availableProcessors());
+ benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
+ benchmark("advanced private serial queue", advancedSystem, advancedSystem.createQueue("test"));
DispatchSPI simpleSystem = new SimpleDispatchSPI(Runtime.getRuntime().availableProcessors());
benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887228&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Fri Dec 4 15:32:12 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.CountDownLatch;
+
+
+import static java.lang.String.*;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatcherPoolTest {
+
+ public static void main(String[] args) throws Exception {
+ DispatcherPool pooledDispatcher = new DispatcherPool("default", Runtime.getRuntime().availableProcessors(), 3);
+ pooledDispatcher.start();
+
+ // warm the JIT up..
+ benchmarkWork(pooledDispatcher, 100000);
+
+ int iterations = 1000*1000*20;
+ long start = System.nanoTime();
+ benchmarkWork(pooledDispatcher, iterations);
+ long end = System.nanoTime();
+
+ double durationMS = 1.0d*(end-start)/1000000d;
+ double rate = 1000d * iterations / durationMS;
+
+ pooledDispatcher.shutdown();
+ System.out.println(format("duration: %,.3f ms, rate: %,.2f executions/sec", durationMS, rate));
+ }
+
+ private static void benchmarkWork(final DispatcherPool pooledDispatcher, int iterations) throws InterruptedException {
+ final CountDownLatch counter = new CountDownLatch(iterations);
+ for (int i = 0; i < 1000; i++) {
+ Work dispatchable = new Work(counter, pooledDispatcher);
+ dispatchable.context.requestDispatch();
+ }
+ counter.await();
+ }
+
+ private static final class Work implements Dispatchable {
+ private final CountDownLatch counter;
+ private final DispatchContext context;
+
+ private Work(CountDownLatch counter, DispatcherPool pooledDispatcher) {
+ this.counter = counter;
+ this.context = pooledDispatcher.register(this , "test");
+ }
+
+ public boolean dispatch() {
+ counter.countDown();
+ if( counter.getCount()>0 ) {
+ context.requestDispatch();
+ }
+ return true;
+ }
+ }
+
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Fri Dec 4 15:32:12 2009
@@ -39,10 +39,10 @@
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
import org.apache.activemq.dispatch.internal.advanced.DispatcherThread;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Fri Dec 4 15:32:12 2009
@@ -19,9 +19,9 @@
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
import org.apache.activemq.flow.ISinkController.FlowControllable;
/**
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java Fri Dec 4 15:32:12 2009
@@ -2,8 +2,8 @@
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
import org.apache.activemq.flow.IFlowController;
import org.apache.activemq.flow.ISinkController;
import org.apache.activemq.flow.ISourceController;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887228&r1=887227&r2=887228&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Fri Dec 4 15:32:12 2009
@@ -13,9 +13,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.dispatch.internal.advanced.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.Dispatchable;
import org.apache.activemq.dispatch.internal.advanced.Dispatcher;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.DispatchContext;
-import org.apache.activemq.dispatch.internal.advanced.Dispatcher.Dispatchable;
import org.apache.activemq.transport.DispatchableTransport;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
@@ -77,7 +77,7 @@
}
public void setDispatcher(Dispatcher dispatcher) {
- readContext = dispatcher.register(this, name);
+ readContext = dispatcher.register((Dispatchable)this, name);
}
public void onReadReady(Pipe<Object> pipe) {