You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/06 20:54:04 UTC
svn commit: r887755 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/
activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/i...
Author: chirino
Date: Sun Dec 6 19:54:02 2009
New Revision: 887755
URL: http://svn.apache.org/viewvc?rev=887755&view=rev
Log:
Adding a DispatchOption enum to control the thread 'sticky' factor for serial queues.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedSerialDispatchQueue.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/RemoteProducer.java Sun Dec 6 19:54:02 2009
@@ -15,6 +15,8 @@
import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.transport.TransportFactory;
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
abstract public class RemoteProducer extends Connection implements FlowUnblockListener<MessageDelivery> {
protected final MetricCounter rate = new MetricCounter();
@@ -59,7 +61,7 @@
setupProducer();
- dispatchQueue = getDispatcher().createSerialQueue(name + "-client");
+ dispatchQueue = getDispatcher().createSerialQueue(name + "-client", STICK_TO_CALLER_THREAD);
dispatchTask = new Runnable(){
public void run() {
dispatch();
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchOption.java Sun Dec 6 19:54:02 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch;
+
+public enum DispatchOption {
+ /**
+ * Updates the target queue to be the
+ * thread queue so that execution 'sticks' to caller's
+ * thread queue.
+ */
+ STICK_TO_CALLER_THREAD,
+
+ /**
+ * Used to update the target queue to be the first
+ * random thread queue that dispatches this queue.
+ */
+ STICK_TO_DISPATCH_THREAD,
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.dispatch;
+import java.util.EnumSet;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -31,7 +33,7 @@
public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit);
public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException;
- String getLabel();
-
+ public String getLabel();
+ public Set<DispatchOption> getOptions();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Sun Dec 6 19:54:02 2009
@@ -18,7 +18,9 @@
import java.nio.channels.SelectableChannel;
+
/**
+ * Provides easy access to a system wide Dispatcher.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@@ -27,9 +29,11 @@
final private static Dispatcher dispatcher = create();
private static Dispatcher create() {
- return new DispatcherConfig().createDispatcher();
+ Dispatcher rc = new DispatcherConfig().createDispatcher();
+ rc.retain();
+ return rc;
}
-
+
static DispatchQueue getMainQueue() {
return dispatcher.getMainQueue();
}
@@ -42,8 +46,8 @@
return dispatcher.getGlobalQueue(priority);
}
- static DispatchQueue getSerialQueue(String label) {
- return dispatcher.createSerialQueue(label);
+ static DispatchQueue getSerialQueue(String label, DispatchOption...options) {
+ return dispatcher.createSerialQueue(label, options);
}
static void dispatchMain() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/Dispatcher.java Sun Dec 6 19:54:02 2009
@@ -21,11 +21,11 @@
public interface Dispatcher extends Retained {
-
+
public DispatchQueue getGlobalQueue();
public DispatchQueue getGlobalQueue(DispatchPriority priority);
- public DispatchQueue createSerialQueue(String label);
+ public DispatchQueue createSerialQueue(String label, DispatchOption... options);
public DispatchQueue getMainQueue();
public void dispatchMain();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherObserver.java Sun Dec 6 19:54:02 2009
@@ -24,7 +24,7 @@
public void onThreadCreate(DispatcherThread thread);
public void onThreadDestroy(DispatcherThread thread);
- public void onQueueCreate(DispatchQueue queue);
+ public void onQueueCreate(DispatchQueue queue, DispatchOption...options);
public void onQueueDestroy(DispatchQueue queue);
public void onSourceCreate(DispatchSource source);
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/AbstractSerialDispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchOption;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractSerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
+
+ private final String label;
+ private final AtomicInteger suspendCounter = new AtomicInteger();
+ private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+ private final AtomicLong size = new AtomicLong();
+ private final Set<DispatchOption> options;
+
+ public AbstractSerialDispatchQueue(String label, DispatchOption...options) {
+ this.label = label;
+ this.options = set(options);
+ retain();
+ }
+
+ static private Set<DispatchOption> set(DispatchOption[] options) {
+ if( options==null || options.length==0 )
+ return Collections.emptySet() ;
+ return Collections.unmodifiableSet(EnumSet.copyOf(Arrays.asList(options)));
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void resume() {
+ if( suspendCounter.decrementAndGet() == 0 ) {
+ if( size.get() != 0 ) {
+ dispatchSelfAsync();
+ }
+ }
+ }
+
+ public void suspend() {
+ suspendCounter.incrementAndGet();
+ }
+
+ public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+ throw new RuntimeException("TODO: implement me.");
+ }
+
+ public void execute(Runnable command) {
+ dispatchAsync(command);
+ }
+
+ public void dispatchAsync(Runnable runnable) {
+ if( runnable == null ) {
+ throw new IllegalArgumentException();
+ }
+ long lastSize = size.getAndIncrement();
+ if( lastSize==0 ) {
+ retain();
+ }
+ runnables.add(runnable);
+ if( lastSize == 0 && suspendCounter.get()<=0 ) {
+ dispatchSelfAsync();
+ }
+ }
+
+ protected void dispatchSelfAsync() {
+ targetQueue.dispatchAsync(this);
+ }
+
+ public void run() {
+ Runnable runnable;
+ long lsize = size.get();
+ while( suspendCounter.get() <= 0 && lsize > 0 ) {
+ try {
+ runnable = runnables.poll();
+ if( runnable!=null ) {
+ runnable.run();
+ lsize = size.decrementAndGet();
+ if( lsize==0 ) {
+ release();
+ }
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void dispatchSync(Runnable runnable) throws InterruptedException {
+ dispatchApply(1, runnable);
+ }
+
+ public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+ QueueSupport.dispatchApply(this, iterations, runnable);
+ }
+
+ public Set<DispatchOption> getOptions() {
+ return options;
+ }
+
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AdvancedDispatcher.java Sun Dec 6 19:54:02 2009
@@ -24,13 +24,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.dispatch.internal.BaseRetained;
-import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
import static org.apache.activemq.dispatch.DispatchPriority.*;
@@ -183,8 +183,8 @@
return globalQueues[priority.ordinal()];
}
- public DispatchQueue createSerialQueue(String label) {
- AdvancedSerialDispatchQueue rc = new AdvancedSerialDispatchQueue(label);
+ public DispatchQueue createSerialQueue(String label, DispatchOption... options) {
+ SerialDispatchQueue rc = new SerialDispatchQueue(label, options);
rc.setTargetQueue(getGlobalQueue());
return rc;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/GlobalDispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -16,8 +16,12 @@
*/
package org.apache.activemq.dispatch.internal.advanced;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.internal.QueueSupport;
@@ -95,4 +99,9 @@
public void retain() {
}
+
+ public Set<DispatchOption> getOptions() {
+ return Collections.emptySet();
+ }
+
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SerialDispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -0,0 +1,32 @@
+package org.apache.activemq.dispatch.internal.advanced;
+
+import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+
+public class SerialDispatchQueue extends AbstractSerialDispatchQueue {
+
+ public SerialDispatchQueue(String label, DispatchOption...options) {
+ super(label, options);
+// context = new DispatchContext(this, true, label);
+}
+
+// @Override
+// protected void dispatchSelfAsync() {
+// AdvancedQueue aq = ((AdvancedQueue)targetQueue);
+// super.dispatchSelfAsync();
+// }
+
+ @Override
+ public void run() {
+ DispatchQueue original = AdvancedDispatcher.CURRENT_QUEUE.get();
+ AdvancedDispatcher.CURRENT_QUEUE.set(this);
+ try {
+ super.run();
+ } finally {
+ AdvancedDispatcher.CURRENT_QUEUE.set(original);
+ }
+ }
+
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ThreadDispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -16,8 +16,12 @@
*/
package org.apache.activemq.dispatch.internal.advanced;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.internal.QueueSupport;
@@ -95,5 +99,10 @@
public void retain() {
}
+
+ public Set<DispatchOption> getOptions() {
+ return Collections.emptySet();
+ }
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/DispatcherThread.java Sun Dec 6 19:54:02 2009
@@ -19,8 +19,6 @@
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.DispatchPriority;
-
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -33,9 +31,9 @@
public DispatcherThread(SimpleDispatcher dispatcher, int ordinal) {
this.dispatcher = dispatcher;
- this.threadQueues = new ThreadDispatchQueue[3];
- for (int i = 0; i < 3; i++) {
- threadQueues[i] = new ThreadDispatchQueue(this, DispatchPriority.values()[i] );
+ this.threadQueues = new ThreadDispatchQueue[dispatcher.globalQueues.length];
+ for (int i = 0; i < threadQueues.length; i++) {
+ threadQueues[i] = new ThreadDispatchQueue(this, dispatcher.globalQueues[i]);
}
setName(dispatcher.getLabel()+" dispatcher: "+(ordinal+1));
setDaemon(true);
@@ -43,11 +41,12 @@
@Override
public void run() {
+ GlobalDispatchQueue[] globalQueues = dispatcher.globalQueues;
try {
outer: while( true ) {
int counter=0;
- for (SimpleQueue queue : threadQueues) {
- SimpleDispatcher.CURRENT_QUEUE.set(queue);
+ for (ThreadDispatchQueue queue : threadQueues) {
+ SimpleDispatcher.CURRENT_QUEUE.set(queue.globalQueue);
Runnable runnable;
while( (runnable = queue.poll())!=null ) {
dispatch(runnable);
@@ -60,8 +59,8 @@
continue;
}
- for (SimpleQueue queue : dispatcher.globalQueues) {
- SimpleDispatcher.CURRENT_QUEUE.set(threadQueues[queue.getPriority().ordinal()]);
+ for (SimpleQueue queue : globalQueues) {
+ SimpleDispatcher.CURRENT_QUEUE.set(queue);
Runnable runnable;
while( (runnable = queue.poll())!=null ) {
@@ -134,6 +133,14 @@
e.printStackTrace();
}
}
+
+ public static DispatcherThread currentDispatcherThread() {
+ Thread currentThread = Thread.currentThread();
+ if( currentThread.getClass() == DispatcherThread.class ) {
+ return (DispatcherThread) currentThread;
+ }
+ return null;
+ }
private final Object wakeupMutex = new Object();
private boolean inWaitingList;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -16,10 +16,14 @@
*/
package org.apache.activemq.dispatch.internal.simple;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.internal.QueueSupport;
@@ -28,7 +32,7 @@
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class GlobalDispatchQueue implements SimpleQueue {
+final public class GlobalDispatchQueue implements SimpleQueue {
private final SimpleDispatcher dispatcher;
final String label;
@@ -69,6 +73,32 @@
QueueSupport.dispatchApply(this, iterations, runnable);
}
+ public ThreadDispatchQueue getTargetQueue() {
+ DispatcherThread thread = DispatcherThread.currentDispatcherThread();
+ if( thread == null ) {
+ return null;
+ }
+ return thread.threadQueues[priority.ordinal()];
+ }
+
+ public Runnable poll() {
+ Runnable rc = runnables.poll();
+ if( rc !=null ) {
+ counter.decrementAndGet();
+ }
+ return rc;
+ }
+
+ public DispatchPriority getPriority() {
+ return priority;
+ }
+
+ public void release() {
+ }
+
+ public void retain() {
+ }
+
public void resume() {
throw new UnsupportedOperationException();
}
@@ -93,25 +123,20 @@
throw new UnsupportedOperationException();
}
- public DispatchQueue getTargetQueue() {
- throw new UnsupportedOperationException();
- }
-
- public Runnable poll() {
- Runnable rc = runnables.poll();
- if( rc !=null ) {
- counter.decrementAndGet();
- }
- return rc;
+ public Set<DispatchOption> getOptions() {
+ return Collections.emptySet();
}
- public DispatchPriority getPriority() {
- return priority;
+ public GlobalDispatchQueue isGlobalDispatchQueue() {
+ return this;
}
- public void release() {
+ public SerialDispatchQueue isSerialDispatchQueue() {
+ return null;
}
- public void retain() {
+ public ThreadDispatchQueue isThreadDispatchQueue() {
+ return null;
}
+
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=887755&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.dispatch.DispatchOption;
+import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
+
+public final class SerialDispatchQueue extends AbstractSerialDispatchQueue implements SimpleQueue {
+
+ private final SimpleDispatcher dispatcher;
+ private volatile boolean stickToThreadOnNextDispatch;
+ private volatile boolean stickToThreadOnNextDispatchRequest;
+
+ SerialDispatchQueue(SimpleDispatcher dispatcher, String label, DispatchOption...options) {
+ super(label, options);
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void setTargetQueue(DispatchQueue targetQueue) {
+ GlobalDispatchQueue global = ((SimpleQueue)targetQueue).isGlobalDispatchQueue();
+ if( getOptions().contains(DispatchOption.STICK_TO_CALLER_THREAD) && global!=null ) {
+ stickToThreadOnNextDispatchRequest=true;
+ }
+ super.setTargetQueue(targetQueue);
+ }
+
+ @Override
+ public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
+ dispatcher.timerThread.addRelative(runnable, this, delay, unit);
+ }
+
+ @Override
+ protected void dispatchSelfAsync() {
+ if( stickToThreadOnNextDispatchRequest ) {
+ SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
+ if( current!=null ) {
+ SimpleQueue parent;
+ while( (parent = current.getTargetQueue()) !=null ) {
+ current = parent;
+ }
+ if( current.isThreadDispatchQueue()==null ) {
+ System.out.println("crap");
+ }
+ super.setTargetQueue(current);
+ stickToThreadOnNextDispatchRequest=false;
+ }
+ }
+ super.dispatchSelfAsync();
+ }
+
+ @Override
+ public void run() {
+ SimpleQueue current = SimpleDispatcher.CURRENT_QUEUE.get();
+ if( stickToThreadOnNextDispatch ) {
+ stickToThreadOnNextDispatch=false;
+ GlobalDispatchQueue global = current.isGlobalDispatchQueue();
+ if( global!=null ) {
+ setTargetQueue(global.getTargetQueue());
+ }
+ }
+ SimpleDispatcher.CURRENT_QUEUE.set(this);
+ try {
+ super.run();
+ } finally {
+ SimpleDispatcher.CURRENT_QUEUE.set(current);
+ }
+ }
+
+ public DispatchPriority getPriority() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Runnable poll() {
+ throw new UnsupportedOperationException();
+ }
+
+ public GlobalDispatchQueue isGlobalDispatchQueue() {
+ return null;
+ }
+
+ public SerialDispatchQueue isSerialDispatchQueue() {
+ return this;
+ }
+
+ public ThreadDispatchQueue isThreadDispatchQueue() {
+ return null;
+ }
+
+ public SimpleQueue getTargetQueue() {
+ return (SimpleQueue) targetQueue;
+ }
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatcher.java Sun Dec 6 19:54:02 2009
@@ -18,7 +18,6 @@
import java.nio.channels.SelectableChannel;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -27,8 +26,9 @@
import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatchSource;
import org.apache.activemq.dispatch.DispatcherConfig;
+import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.internal.BaseRetained;
-import org.apache.activemq.dispatch.internal.SerialDispatchQueue;
+import org.apache.activemq.dispatch.internal.AbstractSerialDispatchQueue;
import static org.apache.activemq.dispatch.DispatchPriority.*;
@@ -41,9 +41,9 @@
*/
final public class SimpleDispatcher extends BaseRetained implements Dispatcher {
- public final static ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+ public final static ThreadLocal<SimpleQueue> CURRENT_QUEUE = new ThreadLocal<SimpleQueue>();
- final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+ final SerialDispatchQueue mainQueue = new SerialDispatchQueue(this, "main");
final GlobalDispatchQueue globalQueues[];
final DispatcherThread dispatchers[];
final AtomicLong globalQueuedRunnables = new AtomicLong();
@@ -74,13 +74,8 @@
return globalQueues[priority.ordinal()];
}
- public DispatchQueue createSerialQueue(String label) {
- SerialDispatchQueue rc = new SerialDispatchQueue(label) {
- @Override
- public void dispatchAfter(Runnable runnable, long delay, TimeUnit unit) {
- timerThread.addRelative(runnable, this, delay, unit);
- }
- };
+ public DispatchQueue createSerialQueue(String label, DispatchOption... options) {
+ AbstractSerialDispatchQueue rc = new SerialDispatchQueue(this, label, options);
rc.setTargetQueue(getGlobalQueue());
return rc;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleQueue.java Sun Dec 6 19:54:02 2009
@@ -7,4 +7,11 @@
Runnable poll();
DispatchPriority getPriority();
+
+ SerialDispatchQueue isSerialDispatchQueue();
+ ThreadDispatchQueue isThreadDispatchQueue();
+ GlobalDispatchQueue isGlobalDispatchQueue();
+
+ SimpleQueue getTargetQueue();
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/ThreadDispatchQueue.java Sun Dec 6 19:54:02 2009
@@ -16,32 +16,35 @@
*/
package org.apache.activemq.dispatch.internal.simple;
+import java.util.Collections;
import java.util.LinkedList;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchOption;
import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.internal.QueueSupport;
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class ThreadDispatchQueue implements SimpleQueue {
+final public class ThreadDispatchQueue implements SimpleQueue {
final String label;
final LinkedList<Runnable> localRunnables = new LinkedList<Runnable>();
final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
- private DispatcherThread dispatcher;
+ final DispatcherThread dispatcher;
final AtomicLong counter;
- private final DispatchPriority priority;
+ final GlobalDispatchQueue globalQueue;
- public ThreadDispatchQueue(DispatcherThread dispatcher, DispatchPriority priority) {
+ public ThreadDispatchQueue(DispatcherThread dispatcher, GlobalDispatchQueue globalQueue) {
this.dispatcher = dispatcher;
- this.priority = priority;
- this.label=priority.toString();
+ this.globalQueue = globalQueue;
+ this.label="thread local "+globalQueue.getLabel();
this.counter = dispatcher.threadQueuedRunnables;
}
@@ -119,12 +122,13 @@
public void setTargetQueue(DispatchQueue queue) {
throw new UnsupportedOperationException();
}
- public DispatchQueue getTargetQueue() {
- throw new UnsupportedOperationException();
+
+ public SimpleQueue getTargetQueue() {
+ return null;
}
public DispatchPriority getPriority() {
- return priority;
+ return globalQueue.getPriority();
}
public void release() {
@@ -133,4 +137,20 @@
public void retain() {
}
+ public Set<DispatchOption> getOptions() {
+ return Collections.emptySet();
+ }
+
+ public GlobalDispatchQueue isGlobalDispatchQueue() {
+ return null;
+ }
+
+ public SerialDispatchQueue isSerialDispatchQueue() {
+ return null;
+ }
+
+ public ThreadDispatchQueue isThreadDispatchQueue() {
+ return this;
+ }
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/TimerThread.java Sun Dec 6 19:54:02 2009
@@ -8,7 +8,7 @@
import static org.apache.activemq.dispatch.internal.simple.TimerThread.Type.*;
-public class TimerThread extends Thread {
+final public class TimerThread extends Thread {
enum Type {
RELATIVE,
ABSOLUTE,
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/actor/ActorTest.java Sun Dec 6 19:54:02 2009
@@ -17,6 +17,8 @@
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.dispatch.internal.advanced.AdvancedDispatcher;
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
/**
* ActorTest
@@ -49,7 +51,7 @@
Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
advancedSystem.retain();
- DispatchQueue queue = advancedSystem.createSerialQueue("test");
+ DispatchQueue queue = advancedSystem.createSerialQueue("test", STICK_TO_CALLER_THREAD);
ActorTestObject testObject = Actor.create(new ActorTestObject(), queue);
CountDownLatch latch = new CountDownLatch(1);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Sun Dec 6 19:54:02 2009
@@ -36,7 +36,7 @@
Dispatcher advancedSystem = new AdvancedDispatcher(new DispatcherConfig());
advancedSystem.retain();
benchmark("advanced global queue", advancedSystem, advancedSystem.getGlobalQueue(DEFAULT));
- benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test"));
+ benchmark("advanced private serial queue", advancedSystem, advancedSystem.createSerialQueue("test", DispatchOption.STICK_TO_CALLER_THREAD));
RunnableCountDownLatch latch = new RunnableCountDownLatch(1);
advancedSystem.addShutdownWatcher(latch);
@@ -47,7 +47,7 @@
simpleSystem.retain();
benchmark("simple global queue", simpleSystem, simpleSystem.getGlobalQueue(DEFAULT));
- benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test"));
+ benchmark("simple private serial queue", simpleSystem, simpleSystem.createSerialQueue("test", DispatchOption.STICK_TO_CALLER_THREAD));
latch = new RunnableCountDownLatch(1);
advancedSystem.addShutdownWatcher(latch);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/internal/advanced/DispatcherPoolTest.java Sun Dec 6 19:54:02 2009
@@ -21,8 +21,8 @@
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatcherConfig;
-
import static java.lang.String.*;
+import static org.apache.activemq.dispatch.DispatchOption.*;
/**
*
@@ -64,7 +64,7 @@
private Work(CountDownLatch counter, AdvancedDispatcher dispatcher) {
this.counter = counter;
- dispatchQueue = dispatcher.createSerialQueue("test");
+ dispatchQueue = dispatcher.createSerialQueue("test", STICK_TO_CALLER_THREAD);
}
public void run() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Sun Dec 6 19:54:02 2009
@@ -38,9 +38,9 @@
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatchPriority;
import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatcherConfig;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.Flow;
@@ -61,6 +61,8 @@
import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.queue.Subscription;
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
public class SharedQueuePerfTest extends TestCase {
private static int PERFORMANCE_SAMPLES = 5;
@@ -308,7 +310,7 @@
sendRate.name("Producer " + name + " Rate");
totalProducerRate.add(sendRate);
- dispatchQueue = dispatcher.createSerialQueue(name);
+ dispatchQueue = dispatcher.createSerialQueue(name, STICK_TO_CALLER_THREAD);
dispatchTask = new Runnable(){
public void run() {
dispatch();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Sun Dec 6 19:54:02 2009
@@ -19,11 +19,13 @@
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.DispatchPriority;
+import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.flow.ISinkController.FlowControllable;
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
/**
* Base class for a {@link FlowControllable}
* {@link IFlowQueue}.
@@ -135,7 +137,7 @@
public synchronized void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
- dispatchQueue = dispatcher.createSerialQueue(getResourceName());
+ dispatchQueue = dispatcher.createSerialQueue(getResourceName(), STICK_TO_CALLER_THREAD);
dispatchTask = new Runnable(){
public void run() {
if( pollingDispatch() ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/RemoteProducer.java Sun Dec 6 19:54:02 2009
@@ -11,6 +11,8 @@
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
public class RemoteProducer extends ClientConnection implements FlowUnblockListener<Message> {
private final MetricCounter rate = new MetricCounter();
@@ -48,7 +50,7 @@
super.start();
outboundController = outputQueue.getFlowController(outboundFlow);
- dispatchQueue = getDispatcher().createSerialQueue(name + "-client");
+ dispatchQueue = getDispatcher().createSerialQueue(name + "-client", STICK_TO_CALLER_THREAD);
dispatchTask = new Runnable(){
public void run() {
dispatch();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=887755&r1=887754&r2=887755&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Sun Dec 6 19:54:02 2009
@@ -14,7 +14,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.dispatch.DispatchPriority;
-import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.Dispatcher;
import org.apache.activemq.dispatch.internal.RunnableCountDownLatch;
@@ -34,6 +33,8 @@
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
+import static org.apache.activemq.dispatch.DispatchOption.*;
+
public class PipeTransportFactory extends TransportFactory {
static private final Object EOF_TOKEN = new Object();
@@ -83,7 +84,7 @@
}
public void setDispatcher(Dispatcher dispatcher) {
- dispatchQueue = dispatcher.createSerialQueue(name);
+ dispatchQueue = dispatcher.createSerialQueue(name, STICK_TO_CALLER_THREAD);
dispatchTask = new Runnable(){
public void run() {
dispatch();