You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/03 22:33:01 UTC
svn commit: r886928 - in
/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src:
main/java/org/apache/activemq/dispatch/
main/java/org/apache/activemq/dispatch/internal/
main/java/org/apache/activemq/dispatch/internal/advanced/ main/java/org/a...
Author: chirino
Date: Thu Dec 3 21:32:56 2009
New Revision: 886928
URL: http://svn.apache.org/viewvc?rev=886928&view=rev
Log:
adding in some gcd inspired dispatch interfaces..
Added:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java
- copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java
- copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java
- copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java
- copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java
- copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java
- copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java
- copied, changed from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchObject.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchObject {
+
+ public <Context> Context getContext();
+ public <Context> void setContext(Context context);
+ public void suspend();
+ public void resume();
+
+ public void setFinalizer(Runnable finalizer);
+ public void setTargetQueue(DispatchQueue queue);
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchQueue.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchQueue extends DispatchObject {
+
+ public void dispatchAsync(Runnable runnable);
+ public void dispatchSync(Runnable runnable) throws InterruptedException;
+
+ public void dispatchAfter(long delayMS, Runnable runnable);
+ public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException;
+
+ String getLabel();
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSource.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface DispatchSource extends DispatchObject {
+
+ public void cancel();
+ public boolean isCanceled();
+
+ public long getData();
+
+ public long getMask();
+ public void setMask(long mask);
+
+ public void setCancelHandler(Runnable cancelHandler);
+ public void setEventHandler(Runnable eventHandler);
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatchSystem.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.nio.channels.SelectableChannel;
+
+import org.apache.activemq.dispatch.internal.simple.SimpleDispatchSystem;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchSystem {
+
+ private static final SimpleDispatchSystem system = new SimpleDispatchSystem(Runtime.getRuntime().availableProcessors());
+
+ static DispatchQueue getMainQueue() {
+ return system.getMainQueue();
+ }
+
+ public static enum DispatchQueuePriority {
+ HIGH,
+ DEFAULT,
+ LOW;
+ }
+
+ static public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+ return system.getGlobalQueue(priority);
+ }
+
+ static DispatchQueue createQueue(String label) {
+ return system.createQueue(label);
+ }
+
+ static DispatchQueue getCurrentQueue() {
+ return system.getCurrentQueue();
+ }
+
+ static void dispatchMain() {
+ system.dispatchMain();
+ }
+
+ static DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
+ return system.createSource(channel, interestOps, queue);
+ }
+
+
+
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/AbstractPooledDispatcher.java Thu Dec 3 21:32:56 2009
@@ -14,20 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
import java.util.ArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-public abstract class AbstractPooledDispatcher<D extends IDispatcher> implements IDispatcher, PooledDispatcher<D> {
+public abstract class AbstractPooledDispatcher implements IDispatcher, PooledDispatcher {
private final String name;
- private final ThreadLocal<D> dispatcher = new ThreadLocal<D>();
- private final ThreadLocal<PooledDispatchContext<D>> dispatcherContext = new ThreadLocal<PooledDispatchContext<D>>();
- private final ArrayList<D> dispatchers = new ArrayList<D>();
+ private final ThreadLocal<IDispatcher> dispatcher = new ThreadLocal<IDispatcher>();
+ private final ThreadLocal<PooledDispatchContext> dispatcherContext = new ThreadLocal<PooledDispatchContext>();
+ private final ArrayList<IDispatcher> dispatchers = new ArrayList<IDispatcher>();
final AtomicBoolean started = new AtomicBoolean();
final AtomicBoolean shutdown = new AtomicBoolean();
@@ -35,12 +35,12 @@
private int roundRobinCounter = 0;
private int size;
- protected ExecutionLoadBalancer<D> loadBalancer;
+ protected ExecutionLoadBalancer loadBalancer;
protected AbstractPooledDispatcher(String name, int size) {
this.name = name;
this.size = size;
- loadBalancer = new SimpleLoadBalancer<D>();
+ loadBalancer = new SimpleLoadBalancer();
}
/**
@@ -52,10 +52,10 @@
* The pool.
* @return The new dispathcer.
*/
- protected abstract D createDispatcher(String name, AbstractPooledDispatcher<D> pool) throws Exception;
+ protected abstract IDispatcher createDispatcher(String name, AbstractPooledDispatcher pool) throws Exception;
/**
- * @see org.apache.activemq.dispatch.IDispatcher#start()
+ * @see org.apache.activemq.dispatch.internal.advanced.IDispatcher#start()
*/
public synchronized final void start() throws Exception {
loadBalancer.start();
@@ -63,7 +63,7 @@
// Create all the workers.
try {
for (int i = 0; i < size; i++) {
- D dispatacher = createDispatcher(name + "-" + (i + 1), this);
+ IDispatcher dispatacher = createDispatcher(name + "-" + (i + 1), this);
dispatchers.add(dispatacher);
dispatacher.start();
@@ -98,11 +98,11 @@
loadBalancer.stop();
}
- public void setCurrentDispatchContext(PooledDispatchContext<D> context) {
+ public void setCurrentDispatchContext(PooledDispatchContext context) {
dispatcherContext.set(context);
}
- public PooledDispatchContext<D> getCurrentDispatchContext() {
+ public PooledDispatchContext getCurrentDispatchContext() {
return dispatcherContext.get();
}
@@ -112,7 +112,7 @@
*
* @return The currently executing dispatcher
*/
- public D getCurrentDispatcher() {
+ public IDispatcher getCurrentDispatcher() {
return dispatcher.get();
}
@@ -120,19 +120,19 @@
* A Dispatcher must call this to indicate that is has started it's dispatch
* loop.
*/
- public void onDispatcherStarted(D d) {
+ public void onDispatcherStarted(IDispatcher d) {
dispatcher.set(d);
loadBalancer.onDispatcherStarted(d);
}
- public ExecutionLoadBalancer<D> getLoadBalancer() {
+ public ExecutionLoadBalancer getLoadBalancer() {
return loadBalancer;
}
/**
* A Dispatcher must call this when exiting it's dispatch loop
*/
- public void onDispatcherStopped(D d) {
+ public void onDispatcherStopped(IDispatcher d) {
synchronized (dispatchers) {
if (dispatchers.remove(d)) {
size--;
@@ -141,8 +141,8 @@
loadBalancer.onDispatcherStopped(d);
}
- protected D chooseDispatcher() {
- D d = dispatcher.get();
+ protected IDispatcher chooseDispatcher() {
+ IDispatcher d = dispatcher.get();
if (d == null) {
synchronized (dispatchers) {
if(dispatchers.isEmpty())
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/DispatcherAware.java Thu Dec 3 21:32:56 2009
@@ -1,4 +1,4 @@
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
/**
* Handy interface to signal classes which would like an IDispatcher instance
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/ExecutionLoadBalancer.java Thu Dec 3 21:32:56 2009
@@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
-public interface ExecutionLoadBalancer<D extends IDispatcher> {
+public interface ExecutionLoadBalancer {
- public interface ExecutionTracker<D extends IDispatcher> {
+ public interface ExecutionTracker {
/**
* Should be called when a {@link DispatchContext#requestDispatch()} is called.
@@ -30,7 +30,7 @@
* @param caller The calling dispatcher
* @param context The context from which the dispatch is requested.
*/
- public void onDispatchRequest(D caller, PooledDispatchContext<D> context);
+ public void onDispatchRequest(IDispatcher caller, PooledDispatchContext context);
/**
* Must be called by the dispatcher when a {@link DispatchContext} is closed.
@@ -42,20 +42,20 @@
* Must be called by a dispatch thread when it starts
* @param dispatcher The dispatcher
*/
- public void onDispatcherStarted(D dispatcher);
+ public void onDispatcherStarted(IDispatcher dispatcher);
/**
* Must be called by a dispatch thread when it stops
* @param dispatcher The dispatcher
*/
- public void onDispatcherStopped(D dispatcher);
+ public void onDispatcherStopped(IDispatcher dispatcher);
/**
* Gets an {@link ExecutionTracker} for the dispatch context.
* @param context
* @return
*/
- public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context);
+ public ExecutionTracker createExecutionTracker(PooledDispatchContext context);
/**
* Starts execution tracking
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/IDispatcher.java Thu Dec 3 21:32:56 2009
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-public interface IDispatcher extends Executor{
+public interface IDispatcher extends Executor {
/**
* This interface is implemented by Dispatchable entities. A Dispatchable
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PooledDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledDispatcher.java Thu Dec 3 21:32:56 2009
@@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.ExecutionLoadBalancer.ExecutionTracker;
+import org.apache.activemq.dispatch.internal.advanced.IDispatcher.DispatchContext;
-public interface PooledDispatcher<D extends IDispatcher> {
+public interface PooledDispatcher {
/**
* A {@link PooledDispatchContext}s can be moved between different
* dispatchers.
*/
- public interface PooledDispatchContext<D extends IDispatcher> extends DispatchContext {
+ public interface PooledDispatchContext extends DispatchContext {
/**
* Called to transfer a {@link PooledDispatchContext} to a new
* Dispatcher.
*/
- public void assignToNewDispatcher(D newDispatcher);
+ public void assignToNewDispatcher(IDispatcher newDispatcher);
/**
* Gets the dispatcher to which this PooledDispatchContext currently
@@ -38,27 +38,27 @@
*
* @return
*/
- public D getDispatcher();
+ public IDispatcher getDispatcher();
/**
* Gets the execution tracker for the context.
*
* @return the execution tracker for the context:
*/
- public ExecutionTracker<D> getExecutionTracker();
+ public ExecutionTracker getExecutionTracker();
}
/**
* A Dispatcher must call this from it's dispatcher thread to indicate that
* is has started it's dispatch has started.
*/
- public void onDispatcherStarted(D dispatcher);
+ public void onDispatcherStarted(IDispatcher dispatcher);
/**
* A Dispatcher must call this from it's dispatcher thread when exiting it's
* dispatch loop
*/
- public void onDispatcherStopped(D dispatcher);
+ public void onDispatcherStopped(IDispatcher dispatcher);
/**
* Returns the currently executing dispatcher, or null if the current thread
@@ -66,16 +66,16 @@
*
* @return The currently executing dispatcher
*/
- public D getCurrentDispatcher();
+ public IDispatcher getCurrentDispatcher();
- public void setCurrentDispatchContext(PooledDispatchContext<D> context);
+ public void setCurrentDispatchContext(PooledDispatchContext context);
- public PooledDispatchContext<D> getCurrentDispatchContext();
+ public PooledDispatchContext getCurrentDispatchContext();
/**
* Returns the load balancer for this dispatch pool.
*
* @return
*/
- public ExecutionLoadBalancer<D> getLoadBalancer();
+ public ExecutionLoadBalancer getLoadBalancer();
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PooledPriorityDispatcher.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.advanced;
+
+import java.util.concurrent.Executor;
+
+final class PooledPriorityDispatcher extends AbstractPooledDispatcher {
+ private final int numPriorities;
+
+ PooledPriorityDispatcher(String name, int size, int numPriorities) {
+ super(name, size);
+ this.numPriorities = numPriorities;
+ }
+
+ @Override
+ protected final PriorityDispatcher createDispatcher(String name, AbstractPooledDispatcher pool) throws Exception {
+ // TODO Auto-generated method stub
+ return new PriorityDispatcher(name, numPriorities, this);
+ }
+
+ public PriorityDispatcher chooseDispatcher() {
+ return ((PriorityDispatcher)super.chooseDispatcher());
+ }
+
+ public final Executor createPriorityExecutor(final int priority) {
+ return new Executor() {
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
+ }
+
+ };
+ }
+
+ public int getDispatchPriorities() {
+ // TODO Auto-generated method stub
+ return numPriorities;
+ }
+
+ public void execute(Runnable command) {
+ chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
+ }
+
+}
\ No newline at end of file
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/PriorityDispatcher.java Thu Dec 3 21:32:56 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
import java.util.ArrayList;
import java.util.HashSet;
@@ -25,15 +25,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.dispatch.ExecutionLoadBalancer.ExecutionTracker;
-import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.ExecutionLoadBalancer.ExecutionTracker;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.PriorityLinkedList;
import org.apache.activemq.util.TimerHeap;
import org.apache.activemq.util.list.LinkedNode;
import org.apache.activemq.util.list.LinkedNodeList;
-public class PriorityDispatcher<D extends PriorityDispatcher<D>> implements Runnable, IDispatcher {
+public class PriorityDispatcher implements Runnable, IDispatcher {
private static final boolean DEBUG = false;
private Thread thread;
@@ -43,7 +43,7 @@
protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
// Set if this dispatcher is part of a dispatch pool:
- protected final PooledDispatcher<D> pooledDispatcher;
+ protected final PooledDispatcher pooledDispatcher;
// The local dispatch queue:
protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
@@ -71,7 +71,7 @@
}
};
- protected PriorityDispatcher(String name, int priorities, PooledDispatcher<D> pooledDispactcher) {
+ protected PriorityDispatcher(String name, int priorities, PooledDispatcher pooledDispactcher) {
this.name = name;
MAX_USER_PRIORITY = priorities - 1;
priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
@@ -87,31 +87,7 @@
}
public static final IDispatcher createPriorityDispatchPool(String name, final int numPriorities, int size) {
- return new AbstractPooledDispatcher<PriorityDispatcher>(name, size) {
-
- @Override
- protected final PriorityDispatcher createDispatcher(String name, AbstractPooledDispatcher<PriorityDispatcher> pool) throws Exception {
- // TODO Auto-generated method stub
- return new PriorityDispatcher(name, numPriorities, this);
- }
-
- public final Executor createPriorityExecutor(final int priority) {
- return new Executor() {
- public void execute(final Runnable runnable) {
- chooseDispatcher().dispatch(new RunnableAdapter(runnable), priority);
- }
- };
- }
-
- public int getDispatchPriorities() {
- // TODO Auto-generated method stub
- return numPriorities;
- }
-
- public void execute(Runnable command) {
- chooseDispatcher().dispatch(new RunnableAdapter(command), 0);
- }
- };
+ return new PooledPriorityDispatcher(name, size, numPriorities);
}
@SuppressWarnings("unchecked")
@@ -217,7 +193,7 @@
if (pooledDispatcher != null) {
// Inform the dispatcher that we have started:
- pooledDispatcher.onDispatcherStarted((D) this);
+ pooledDispatcher.onDispatcherStarted((PriorityDispatcher) this);
}
PriorityDispatchContext pdc;
@@ -291,7 +267,7 @@
thrown.printStackTrace();
} finally {
if (pooledDispatcher != null) {
- pooledDispatcher.onDispatcherStopped((D) this);
+ pooledDispatcher.onDispatcherStopped((PriorityDispatcher) this);
}
cleanup();
}
@@ -435,25 +411,25 @@
return name;
}
- private final D getCurrentDispatcher() {
+ private final PriorityDispatcher getCurrentDispatcher() {
if (pooledDispatcher != null) {
- return pooledDispatcher.getCurrentDispatcher();
+ return (PriorityDispatcher) pooledDispatcher.getCurrentDispatcher();
} else if (Thread.currentThread() == thread) {
- return (D) this;
+ return (PriorityDispatcher) this;
} else {
return null;
}
}
- private final PooledDispatchContext<D> getCurrentDispatchContext() {
+ private final PooledDispatchContext getCurrentDispatchContext() {
return pooledDispatcher.getCurrentDispatchContext();
}
/**
*
*/
- protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext<D> {
+ protected class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PooledDispatchContext {
// The dispatchable target:
private final Dispatchable dispatchable;
// The name of this context:
@@ -466,9 +442,9 @@
// from foreign threads:
final UpdateEvent updateEvent[];
- private final ExecutionTracker<D> tracker;
- protected D currentOwner;
- private D updateDispatcher = null;
+ private final ExecutionTracker tracker;
+ protected PriorityDispatcher currentOwner;
+ private PriorityDispatcher updateDispatcher = null;
private int priority;
private boolean dispatchRequested = false;
@@ -478,9 +454,9 @@
protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
this.dispatchable = dispatchable;
this.name = name;
- this.currentOwner = (D) PriorityDispatcher.this;
+ this.currentOwner = (PriorityDispatcher) PriorityDispatcher.this;
if (persistent && pooledDispatcher != null) {
- this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
+ this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext) this);
} else {
this.tracker = null;
}
@@ -490,8 +466,7 @@
currentOwner.takeOwnership(this);
}
- @SuppressWarnings("unchecked")
- private final PriorityDispatcher<D>.UpdateEvent[] createUpdateEvent() {
+ private final PriorityDispatcher.UpdateEvent[] createUpdateEvent() {
return new PriorityDispatcher.UpdateEvent[2];
}
@@ -500,7 +475,7 @@
*
* @return the execution tracker for the context:
*/
- public ExecutionTracker<D> getExecutionTracker() {
+ public ExecutionTracker getExecutionTracker() {
return tracker;
}
@@ -513,7 +488,7 @@
return dispatchable.dispatch();
}
- public final void assignToNewDispatcher(D newDispatcher) {
+ public final void assignToNewDispatcher(IDispatcher newDispatcher) {
synchronized (this) {
// If we're already set to this dispatcher
@@ -523,7 +498,7 @@
}
}
- updateDispatcher = newDispatcher;
+ updateDispatcher = (PriorityDispatcher) newDispatcher;
if (DEBUG)
System.out.println(getName() + " updating to " + updateDispatcher);
@@ -534,7 +509,7 @@
public void requestDispatch() {
- D callingDispatcher = getCurrentDispatcher();
+ PriorityDispatcher callingDispatcher = getCurrentDispatcher();
if (tracker != null)
tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
@@ -574,7 +549,7 @@
if (this.priority == priority) {
return;
}
- D callingDispatcher = getCurrentDispatcher();
+ PriorityDispatcher callingDispatcher = getCurrentDispatcher();
// Otherwise this is coming off another thread, so we need to
// synchronize to protect against ownership changes:
@@ -647,7 +622,7 @@
* @param newDispatcher
* The new Dispatcher
*/
- protected void switchedDispatcher(D oldDispatcher, D newDispatcher) {
+ protected void switchedDispatcher(PriorityDispatcher oldDispatcher, PriorityDispatcher newDispatcher) {
}
@@ -656,7 +631,7 @@
}
public void close(boolean sync) {
- D callingDispatcher = getCurrentDispatcher();
+ PriorityDispatcher callingDispatcher = getCurrentDispatcher();
// System.out.println(this + "Closing");
synchronized (this) {
closed = true;
@@ -695,13 +670,14 @@
return dispatchable;
}
- public D getDispatcher() {
+ public PriorityDispatcher getDispatcher() {
return currentOwner;
}
public String getName() {
return name;
}
+
}
public String getName() {
Copied: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java (from r886891, activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java&r1=886891&r2=886928&rev=886928&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/advanced/SimpleLoadBalancer.java Thu Dec 3 21:32:56 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.dispatch;
+package org.apache.activemq.dispatch.internal.advanced;
import java.util.ArrayList;
import java.util.HashMap;
@@ -22,16 +22,16 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.dispatch.PooledDispatcher.PooledDispatchContext;
+import org.apache.activemq.dispatch.internal.advanced.PooledDispatcher.PooledDispatchContext;
-public class SimpleLoadBalancer<D extends IDispatcher> implements ExecutionLoadBalancer<D> {
+public class SimpleLoadBalancer implements ExecutionLoadBalancer {
private final boolean DEBUG = false;
//TODO: Added plumbing for periodic rebalancing which we should
//consider implementing
private static final boolean ENABLE_UPDATES = false;
- private final ArrayList<D> dispatchers = new ArrayList<D>();
+ private final ArrayList<IDispatcher> dispatchers = new ArrayList<IDispatcher>();
private AtomicBoolean running = new AtomicBoolean(false);
private boolean needsUpdate = false;
@@ -86,7 +86,7 @@
running.compareAndSet(true, false);
}
- public synchronized final void onDispatcherStarted(D dispatcher) {
+ public synchronized final void onDispatcherStarted(IDispatcher dispatcher) {
dispatchers.add(dispatcher);
scheduleNext();
}
@@ -94,20 +94,20 @@
/**
* A Dispatcher must call this when exiting it's dispatch loop
*/
- public void onDispatcherStopped(D dispatcher) {
+ public void onDispatcherStopped(IDispatcher dispatcher) {
dispatchers.remove(dispatcher);
}
- public ExecutionTracker<D> createExecutionTracker(PooledDispatchContext<D> context) {
+ public ExecutionTracker createExecutionTracker(PooledDispatchContext context) {
return new SimpleExecutionTracker(context);
}
- private static class ExecutionStats<D extends IDispatcher> {
- final PooledDispatchContext<D> target;
- final PooledDispatchContext<D> source;
+ private static class ExecutionStats {
+ final PooledDispatchContext target;
+ final PooledDispatchContext source;
int count;
- ExecutionStats(PooledDispatchContext<D> source, PooledDispatchContext<D> target) {
+ ExecutionStats(PooledDispatchContext source, PooledDispatchContext target) {
this.target = target;
this.source = source;
}
@@ -117,15 +117,15 @@
}
}
- private class SimpleExecutionTracker implements ExecutionTracker<D> {
- private final HashMap<PooledDispatchContext<D>, ExecutionStats<D>> sources = new HashMap<PooledDispatchContext<D>, ExecutionStats<D>>();
- private final PooledDispatchContext<D> context;
+ private class SimpleExecutionTracker implements ExecutionTracker {
+ private final HashMap<PooledDispatchContext, ExecutionStats> sources = new HashMap<PooledDispatchContext, ExecutionStats>();
+ private final PooledDispatchContext context;
private final AtomicInteger work = new AtomicInteger(0);
- private PooledDispatchContext<D> singleSource;
+ private PooledDispatchContext singleSource;
private IDispatcher currentOwner;
- SimpleExecutionTracker(PooledDispatchContext<D> context) {
+ SimpleExecutionTracker(PooledDispatchContext context) {
this.context = context;
currentOwner = context.getDispatcher();
}
@@ -144,7 +144,7 @@
* @return True if this method resulted in the dispatch request being
* assigned to another dispatcher.
*/
- public void onDispatchRequest(D callingDispatcher, PooledDispatchContext<D> callingContext) {
+ public void onDispatchRequest(IDispatcher callingDispatcher, PooledDispatchContext callingContext) {
if (callingContext != null) {
// Make sure we are being called by another node:
@@ -156,7 +156,7 @@
if (singleSource != callingContext) {
if (singleSource == null && sources.isEmpty()) {
singleSource = callingContext;
- ExecutionStats<D> stats = new ExecutionStats<D>(callingContext, context);
+ ExecutionStats stats = new ExecutionStats(callingContext, context);
stats.count++;
sources.put(callingContext, stats);
@@ -172,9 +172,9 @@
} else {
- ExecutionStats<D> stats = sources.get(callingContext);
+ ExecutionStats stats = sources.get(callingContext);
if (stats == null) {
- stats = new ExecutionStats<D>(callingContext, context);
+ stats = new ExecutionStats(callingContext, context);
sources.put(callingContext, stats);
}
stats.count++;
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/AbstractDispatchObject.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import org.apache.activemq.dispatch.DispatchObject;
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AbstractDispatchObject implements DispatchObject {
+
+ protected Object context;
+ protected Runnable finalizer;
+ protected DispatchQueue targetQueue;
+
+ @SuppressWarnings("unchecked")
+ public <Context> Context getContext() {
+ return (Context) context;
+ }
+
+ public <Context> void setContext(Context context) {
+ this.context = context;
+ }
+
+ public void setFinalizer(Runnable finalizer) {
+ this.finalizer = finalizer;
+ }
+
+ public void setTargetQueue(DispatchQueue targetQueue) {
+ this.targetQueue = targetQueue;
+ }
+
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/Dispatcher.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final public class Dispatcher extends Thread {
+ private final SimpleDispatchSystem system;
+
+ public Dispatcher(SimpleDispatchSystem javaDispatchSystem, int ordinal) {
+ system = javaDispatchSystem;
+ setName("dispatcher:"+(ordinal+1));
+ setDaemon(true);
+
+ }
+
+ @Override
+ public void run() {
+ GlobalDispatchQueue[] dispatchQueues = system.globalQueues;
+ while( true ) {
+ for (GlobalDispatchQueue queue : dispatchQueues) {
+ SimpleDispatchSystem.CURRENT_QUEUE.set(queue);
+ ConcurrentLinkedQueue<Runnable> runnables = queue.runnables;
+ Runnable runnable;
+ while( (runnable = runnables.poll())!=null ) {
+ system.globalQueuedRunnables.decrementAndGet();
+ dispatch(runnable);
+ }
+ }
+ if( system.globalQueuedRunnables.get()==0 ) {
+ try {
+ system.waitForWakeup();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+ }
+
+ private void dispatch(Runnable runnable) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/GlobalDispatchQueue.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class GlobalDispatchQueue implements DispatchQueue {
+
+ private final SimpleDispatchSystem system;
+ final String label;
+ final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+
+ public GlobalDispatchQueue(SimpleDispatchSystem system, DispatchQueuePriority priority) {
+ this.system = system;
+ this.label=priority.toString();
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void dispatchAsync(Runnable runnable) {
+ runnables.add(runnable);
+ system.wakeup();
+ }
+
+ public void dispatchAfter(long delayMS, Runnable runnable) {
+ throw new RuntimeException("TODO: implement me.");
+ }
+
+ public void dispatchSync(final Runnable runnable) throws InterruptedException {
+ dispatchApply(1, runnable);
+ }
+
+ public void dispatchApply(int iterations, final Runnable runnable) throws InterruptedException {
+ QueueSupport.dispatchApply(this, iterations, runnable);
+ }
+
+ public void resume() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void suspend() {
+ throw new UnsupportedOperationException();
+ }
+
+ public <Context> Context getContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ public <Context> void setContext(Context context) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setFinalizer(Runnable finalizer) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setTargetQueue(DispatchQueue queue) {
+ throw new UnsupportedOperationException();
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/QueueSupport.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueSupport {
+
+ static public void dispatchApply(DispatchQueue queue, int itterations, final Runnable runnable) throws InterruptedException {
+ final CountDownLatch done = new CountDownLatch(itterations);
+ Runnable wrapper = new Runnable() {
+ public void run() {
+ try {
+ runnable.run();
+ } finally {
+ done.countDown();
+ }
+ }
+ };
+ for( int i=0; i < itterations; i++ ) {
+ queue.dispatchAsync(wrapper);
+ }
+ done.await();
+ }
+
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SerialDispatchQueue.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SerialDispatchQueue extends AbstractDispatchObject implements DispatchQueue, Runnable {
+
+ private final ConcurrentLinkedQueue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+ final private String label;
+ final private AtomicInteger suspendCounter = new AtomicInteger();
+ final private AtomicLong size = new AtomicLong();
+
+ public SerialDispatchQueue(String label) {
+ this.label = label;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void resume() {
+ suspendCounter.decrementAndGet();
+ }
+
+ public void suspend() {
+ suspendCounter.incrementAndGet();
+ }
+
+ public void dispatchAfter(long delayMS, Runnable runnable) {
+ throw new RuntimeException("TODO: implement me.");
+ }
+
+ public void dispatchAsync(Runnable runnable) {
+ if( runnable == null ) {
+ throw new IllegalArgumentException();
+ }
+ long lastSize = size.incrementAndGet();
+ runnables.add(runnable);
+ if( targetQueue!=null && lastSize == 1 && suspendCounter.get()<=0 ) {
+ targetQueue.dispatchAsync(this);
+ }
+ }
+
+ public void run() {
+ DispatchQueue original = SimpleDispatchSystem.CURRENT_QUEUE.get();
+ SimpleDispatchSystem.CURRENT_QUEUE.set(this);
+ try {
+ Runnable runnable;
+ long lsize = size.get();
+ while( suspendCounter.get() <= 0 && lsize > 0 ) {
+ try {
+ runnable = runnables.poll();
+ if( runnable!=null ) {
+ runnable.run();
+ lsize = size.decrementAndGet();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ } finally {
+ SimpleDispatchSystem.CURRENT_QUEUE.set(original);
+ }
+ }
+
+ public void dispatchSync(Runnable runnable) throws InterruptedException {
+ dispatchApply(1, runnable);
+ }
+
+ public void dispatchApply(int iterations, Runnable runnable) throws InterruptedException {
+ QueueSupport.dispatchApply(this, iterations, runnable);
+ }
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/internal/simple/SimpleDispatchSystem.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch.internal.simple;
+
+import java.nio.channels.SelectableChannel;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.dispatch.DispatchQueue;
+import org.apache.activemq.dispatch.DispatchSource;
+import org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority;
+
+import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+
+
+/**
+ * Implements a simple dispatch system.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SimpleDispatchSystem {
+
+ static final ThreadLocal<DispatchQueue> CURRENT_QUEUE = new ThreadLocal<DispatchQueue>();
+
+ final SerialDispatchQueue mainQueue = new SerialDispatchQueue("main");
+ final GlobalDispatchQueue globalQueues[];
+ final Dispatcher dispatchers[];
+
+ private final Object wakeupMutex = new Object();
+ final AtomicLong globalQueuedRunnables = new AtomicLong();
+
+ public SimpleDispatchSystem(int size) {
+ globalQueues = new GlobalDispatchQueue[3];
+ for (int i = 0; i < 3; i++) {
+ globalQueues[i] = new GlobalDispatchQueue(this, DispatchQueuePriority.values()[i] );
+ }
+
+ dispatchers = new Dispatcher[size];
+ for (int i = 0; i < size; i++) {
+ dispatchers[i] = new Dispatcher(this, i);
+ dispatchers[i].start();
+
+ }
+ }
+
+ public DispatchQueue getMainQueue() {
+ return mainQueue;
+ }
+
+ public DispatchQueue getGlobalQueue(DispatchQueuePriority priority) {
+ return globalQueues[priority.ordinal()];
+ }
+
+ public DispatchQueue createQueue(String label) {
+ SerialDispatchQueue rc = new SerialDispatchQueue(label);
+ rc.setTargetQueue(getGlobalQueue(DEFAULT));
+ return rc;
+ }
+
+ public DispatchQueue getCurrentQueue() {
+ return CURRENT_QUEUE.get();
+ }
+
+ public void dispatchMain() {
+ mainQueue.run();
+ }
+
+ public DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue) {
+ return null;
+ }
+
+ public void waitForWakeup() throws InterruptedException {
+ while( globalQueuedRunnables.get()==0 ) {
+ synchronized(wakeupMutex) {
+ wakeupMutex.wait();
+ }
+ }
+ }
+
+ void wakeup() {
+ if( globalQueuedRunnables.incrementAndGet() < dispatchers.length ) {
+ synchronized(wakeupMutex) {
+ wakeupMutex.notify();
+ }
+ }
+ }
+}
Added: activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java?rev=886928&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dispatcher/src/test/java/org/apache/activemq/dispatch/DispatchSystemTest.java Thu Dec 3 21:32:56 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.concurrent.CountDownLatch;
+
+import static java.lang.String.*;
+import static org.apache.activemq.dispatch.DispatchSystem.DispatchQueuePriority.*;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DispatchSystemTest {
+
+ public static void main(String[] args) throws InterruptedException {
+ benchmark("private serial queue", DispatchSystem.createQueue("test"));
+ benchmark("global queue", DispatchSystem.getGlobalQueue(DEFAULT));
+ }
+
+ private static void benchmark(String name, DispatchQueue queue) throws InterruptedException {
+ // warm the JIT up..
+ benchmarkWork(queue, 100000);
+
+ int iterations = 1000*1000*20;
+ long start = System.nanoTime();
+ benchmarkWork(queue, iterations);
+ long end = System.nanoTime();
+
+ double durationMS = 1.0d*(end-start)/1000000d;
+ double rate = 1000d * iterations / durationMS;
+
+ System.out.println(format("name: %s, duration: %,.3f ms, rate: %,.2f executions/sec", name, durationMS, rate));
+ }
+
+ private static void benchmarkWork(final DispatchQueue queue, int iterations) throws InterruptedException {
+ final CountDownLatch counter = new CountDownLatch(iterations);
+ Runnable task = new Runnable(){
+ public void run() {
+ counter.countDown();
+ if( counter.getCount()>0 ) {
+ queue.dispatchAsync(this);
+ }
+ }
+ };
+ for (int i = 0; i < 1000; i++) {
+ queue.dispatchAsync(task);
+ }
+ counter.await();
+ }
+}