You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/11 21:12:30 UTC
svn commit: r743476 [1/4] - in /activemq/sandbox/activemq-flow: ./ src/
src/main/ src/main/java/ src/main/java/com/ src/main/java/com/progress/
src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/activemq/ src/main/java/org/apache/act...
Author: chirino
Date: Wed Feb 11 20:12:28 2009
New Revision: 743476
URL: http://svn.apache.org/viewvc?rev=743476&view=rev
Log:
Initial spike of the flow module. This is basically an experiment of a different flow control and threading model to see if we can make a
faster/simpler version of the broker. This code was jointly developed between Colin Macnaughton (yes, he has an icla on file) and me.
Will follow up with a post to the dev list to try to entice more eyes to look at this to see if it sparks any interest.
Added:
activemq/sandbox/activemq-flow/pom.xml
activemq/sandbox/activemq-flow/src/
activemq/sandbox/activemq-flow/src/main/
activemq/sandbox/activemq-flow/src/main/java/
activemq/sandbox/activemq-flow/src/main/java/com/
activemq/sandbox/activemq-flow/src/main/java/com/progress/
activemq/sandbox/activemq-flow/src/main/java/org/
activemq/sandbox/activemq-flow/src/main/java/org/apache/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowResource.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSource.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISinkController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PriorityFlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Metric.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricAggregator.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/MetricCounter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/metric/Period.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IAsynchronousFlowSource.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IBlockingFlowSource.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IPollableFlowSource.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MultiFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java
activemq/sandbox/activemq-flow/src/test/
activemq/sandbox/activemq-flow/src/test/java/
activemq/sandbox/activemq-flow/src/test/java/com/
activemq/sandbox/activemq-flow/src/test/java/com/progress/
activemq/sandbox/activemq-flow/src/test/java/org/
activemq/sandbox/activemq-flow/src/test/java/org/apache/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MessageGenerator.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Pipe.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/TestFlowManager.java
Added: activemq/sandbox/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (added)
+++ activemq/sandbox/activemq-flow/pom.xml Wed Feb 11 20:12:28 2009
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>5.3-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.activemq.flow</groupId>
+ <artifactId>activemq-flow</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-SNAPSHOT</version>
+
+ <name>ActiveMQ :: Flow</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>colt</groupId>
+ <artifactId>colt</artifactId>
+ <version>1.2.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/ExecutionLoadBalancer.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+
+public interface ExecutionLoadBalancer {
+
+ /**
+ * A Load Balanced Dispatch context can be moved between different
+ * dispatchers.
+ */
+ public interface LoadBalancedDispatchContext extends DispatchContext {
+ /**
+ * A dispatcher must call this when it starts dispatch for this context
+ */
+ public void startingDispatch();
+
+ /**
+ * A dispatcher must call this when it has finished dispatching a
+ * context
+ */
+ public void finishedDispatch();
+
+ /**
+ *
+ */
+ public void processForeignUpdates();
+ }
+
+ public interface PoolableDispatchContext extends DispatchContext {
+
+ public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context);
+
+ /**
+ * Indicates that another thread has made an update to the dispatch
+ * context.
+ *
+ */
+ public void onForeignThreadUpdate();
+
+ public PoolableDispatcher getDispatcher();
+ }
+
+ public interface PoolableDispatcher extends IDispatcher {
+
+ /**
+ * Indicates that another thread has made an update to the dispatch
+ * context.
+ *
+ */
+ public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name);
+ }
+
+ /**
+ * This wraps the dispatch context into one that is load balanced by the
+ * LoadBalancer
+ *
+ * @param context
+ * The context to wrap.
+ * @return
+ */
+ public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context);
+
+ /**
+ * Adds a Dispatcher to the list of dispatchers managed by the load balancer
+ *
+ * @param dispatcher
+ */
+ public void addDispatcher(PoolableDispatcher dispatcher);
+
+ /**
+ * A Dispatcher must call this from it's dispatcher thread to indicate that
+ * is has started it's dispatch has started.
+ */
+ public void onDispatcherStarted(PoolableDispatcher dispatcher);
+
+ /**
+ * A Dispatcher must call this from it's dispatcher thread when exiting it's
+ * dispatch loop
+ */
+ public void onDispatcherStopped(PoolableDispatcher dispatcher);
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+public interface IDispatcher {
+
+ /**
+ * This interface is implemented by Dispatchable entities. A Dispatchable
+ * entity registers with an {@link IDispatcher} and is returned a
+ * {@link DispatchContext} which it can use to request the
+ * {@link IDispatcher} to invoke {@link Dispatchable#dispatch()}
+ *
+ * {@link IDispatcher} guarantees that {@link #dispatch()} will never invoke
+ * dispatch concurrently unless the {@link Dispatchable} is registered with
+ * more than one {@link IDispatcher};
+ */
+ public interface Dispatchable {
+ public boolean dispatch();
+ }
+
+ /**
+ * Returned to callers registered with this dispathcer. Used by the caller
+ * to inform the dispatcher that it is ready for dispatch.
+ *
+ * Note that DispatchContext is not safe for concurrent access by multiple
+ * threads.
+ */
+ public interface DispatchContext {
+ /**
+ * Once registered with a dispatcher, this can be called to request
+ * dispatch. The {@link Dispatchable} will remain in the dispatch queue
+ * until a subsequent call to {@link Dispatchable#dispatch()} returns
+ * false;
+ */
+ public void requestDispatch();
+
+ /**
+ * This can be called to update the dispatch priority.
+ *
+ * @param priority
+ */
+ public void updatePriority(int priority);
+
+ /**
+ * Gets the Dispatchable that this context represents.
+ *
+ * @return The dispatchable
+ */
+ public Dispatchable getDispatchable();
+
+ /**
+ * Gets the name of the dispatch context
+ *
+ * @return The dispatchable
+ */
+ public String getName();
+
+ /**
+ * This must be called to release any resource the dispatcher is holding
+ * on behalf of this context.
+ */
+ public void close();
+ }
+
+ class RunnableAdapter implements Dispatchable {
+ final Runnable runnable;
+
+ RunnableAdapter(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public boolean dispatch() {
+ runnable.run();
+ return true;
+ }
+ }
+
+ /**
+ * Registers a {@link Dispatchable} with this dispatcher, and returns a
+ * {@link DispatchContext} that the caller can use to request dispatch.
+ *
+ * @param dispatchable
+ * The {@link Dispatchable}
+ * @param name
+ * An identifier for the dispatcher.
+ * @return A {@link DispatchContext} that can be used to request dispatch
+ */
+ public DispatchContext register(Dispatchable dispatchable, String name);
+
+ /**
+ * Creates an executor that will execute its tasks at the specified
+ * priority.
+ *
+ * @param priority
+ * The priority
+ * @return A prioritized executor.
+ */
+ public Executor createPriorityExecutor(int priority);
+
+ /**
+ * Starts the dispatcher.
+ */
+ public void start();
+
+ /**
+ * Shuts down the dispatcher, this may result in previous dispatch requests
+ * going unserved.
+ */
+ public void shutdown() throws InterruptedException;
+
+ /**
+ * Schedules the given {@link Runnable} to be run at the specified time in
+ * the future on this {@link IDispatcher}.
+ *
+ * @param runnable
+ * The Runnable to execute
+ * @param delay
+ * The delay
+ * @param timeUnit
+ * The TimeUnit used to interpret delay.
+ */
+ public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit);
+
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.LinkedList;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.LoadBalancedDispatchContext;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatchContext;
+import org.apache.activemq.dispatch.ExecutionLoadBalancer.PoolableDispatcher;
+import org.apache.activemq.queue.Mapper;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class PriorityDispatcher implements Runnable, PoolableDispatcher {
+
+ private Thread thread;
+ private boolean running = false;
+ private boolean threaded = false;
+ private final int MAX_USER_PRIORITY;
+
+ static final ThreadLocal<PriorityDispatcher> dispatcher = new ThreadLocal<PriorityDispatcher>();
+
+ private final ExecutionLoadBalancer loadBalancer;
+
+ // The local dispatch queue:
+ private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+
+ // Dispatch queue for requests from other threads:
+ private final LinkedNodeList<ForeignEvent> foreignQueue = new LinkedNodeList<ForeignEvent>();
+
+ // Timed Execution List
+ private final TimerHeap timerHeap = new TimerHeap();
+
+ private final String name;
+ private final AtomicBoolean foreignAvailable = new AtomicBoolean(false);
+ private final Semaphore foreignPermits = new Semaphore(0);
+
+ private final Mapper<Integer, PriorityDispatchContext> PRIORITY_MAPPER = new Mapper<Integer, PriorityDispatchContext>() {
+ public Integer map(PriorityDispatchContext element) {
+ return element.listPrio;
+ }
+ };
+
+ public PriorityDispatcher(String name, int priorities, ExecutionLoadBalancer loadBalancer) {
+ this.name = name;
+ MAX_USER_PRIORITY = priorities;
+ priorityQueue = new PriorityLinkedList<PriorityDispatchContext>(MAX_USER_PRIORITY + 1, PRIORITY_MAPPER);
+ this.loadBalancer = loadBalancer;
+ loadBalancer.addDispatcher(this);
+ }
+
+ private abstract class ForeignEvent extends LinkedNode<ForeignEvent> {
+ public abstract void execute();
+
+ final void addToList() {
+ synchronized (foreignQueue) {
+ if (!this.isLinked()) {
+ foreignQueue.addLast(this);
+ if (!foreignAvailable.getAndSet(true)) {
+ foreignPermits.release();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isThreaded() {
+ return threaded;
+ }
+
+ public void setThreaded(boolean threaded) {
+ this.threaded = threaded;
+ }
+
+ private class UpdateEvent extends ForeignEvent {
+ private final PriorityDispatchContext pdc;
+
+ UpdateEvent(PriorityDispatchContext pdc) {
+ this.pdc = pdc;
+ }
+
+ // Can only be called by the owner of this dispatch context:
+ public void execute() {
+ pdc.lbContext.processForeignUpdates();
+ }
+ }
+
+ class PriorityDispatchContext extends LinkedNode<PriorityDispatchContext> implements PoolableDispatchContext {
+ // The dispatchable target:
+ final Dispatchable dispatchable;
+ LoadBalancedDispatchContext lbContext;
+ // The name of this context:
+ final String name;
+ // list prio can only be updated in the thread of of this dispatcher:
+ int listPrio;
+ // The update event is used to update fields in the dispatch context
+ // from foreign threads:
+ final UpdateEvent updateEvent = new UpdateEvent(this);
+
+ // Marked by the caller when this is closed.
+ boolean closed = false;
+
+ private PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
+ super();
+ this.dispatchable = dispatchable;
+ this.name = name;
+ }
+
+ // The load balancer will guarantee that this is on our thread:
+ public final void requestDispatch() {
+ if (!isLinked()) {
+ priorityQueue.add(this, listPrio);
+ }
+ return;
+ }
+
+ // The load balancer guarantees that this is called on our thread:
+ public final void updatePriority(int priority) {
+ if (priority != listPrio) {
+
+ listPrio = priority;
+ // If there is a priority change relink the context
+ // at the new priority:
+ if (isLinked()) {
+ unlink();
+ priorityQueue.add(this, listPrio);
+ }
+ }
+ return;
+
+ }
+
+ public void onForeignThreadUpdate() {
+ updateEvent.addToList();
+ }
+
+ // Will only be called on this thread:
+ public void close() {
+ if (isLinked()) {
+ unlink();
+ }
+ synchronized (foreignQueue) {
+ if (updateEvent.isLinked()) {
+ updateEvent.unlink();
+ }
+ }
+
+ closed = true;
+ }
+
+ /**
+ * This can only be called by the owning dispatch thread:
+ *
+ * @return False if the dispatchable has more work to do.
+ */
+ public final boolean dispatch() {
+ return dispatchable.dispatch();
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public Dispatchable getDispatchable() {
+ return dispatchable;
+ }
+
+ public void setLoadBalancedDispatchContext(LoadBalancedDispatchContext context) {
+ this.lbContext = context;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public PoolableDispatcher getDispatcher() {
+ return PriorityDispatcher.this;
+ }
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return loadBalancer.createLoadBalancedDispatchContext(createPoolablDispatchContext(dispatchable, name));
+ }
+
+ public PoolableDispatchContext createPoolablDispatchContext(Dispatchable dispatchable, String name) {
+ return new PriorityDispatchContext(dispatchable, true, name);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#start()
+ */
+ public synchronized final void start() {
+ if (thread == null) {
+ running = true;
+ thread = new Thread(this, name);
+ thread.start();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public synchronized final void shutdown() throws InterruptedException {
+ if (thread != null) {
+ dispatch(new RunnableAdapter(new Runnable() {
+
+ public void run() {
+ running = false;
+ }
+
+ }), MAX_USER_PRIORITY + 1);
+ thread.interrupt();
+ thread.join();
+ thread = null;
+ }
+ }
+
+ public void run() {
+
+ // Inform the dispatcher that we have started:
+ loadBalancer.onDispatcherStarted(this);
+ dispatcher.set(this);
+ PriorityDispatchContext pdc;
+ try {
+ while (running) {
+ pdc = priorityQueue.poll();
+ // If no local work available wait for foreign work:
+ if (pdc == null) {
+ foreignPermits.acquire();
+ } else {
+ pdc.lbContext.startingDispatch();
+
+ while (!pdc.dispatch()) {
+ // If there is a higher priority dispatchable stop
+ // processing this one:
+ if (pdc.listPrio < priorityQueue.getHighestPriority()) {
+ // May have gotten relinked by the caller:
+ if (!pdc.isLinked()) {
+ priorityQueue.add(pdc, pdc.listPrio);
+ }
+ break;
+ }
+ }
+
+ pdc.lbContext.finishedDispatch();
+
+ }
+
+ // Execute delayed events:
+ timerHeap.executeReadyEvents();
+
+ // Check for foreign dispatch requests:
+ if (foreignAvailable.get()) {
+ synchronized (foreignQueue) {
+ // Drain the foreign queue:
+ while (true) {
+ ForeignEvent fe = foreignQueue.getHead();
+ // TODO should probably swap foreign queue here:
+ if (fe == null) {
+ foreignAvailable.set(false);
+ foreignPermits.drainPermits();
+ break;
+ }
+
+ fe.unlink();
+ fe.execute();
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ } finally {
+ loadBalancer.onDispatcherStopped(this);
+ }
+ }
+
+ class ThreadSafeDispatchContext implements LoadBalancedDispatchContext {
+ final PriorityDispatchContext delegate;
+
+ ThreadSafeDispatchContext(PriorityDispatchContext context) {
+ this.delegate = context;
+ delegate.setLoadBalancedDispatchContext(this);
+ }
+
+ public void finishedDispatch() {
+ // NOOP
+
+ }
+
+ public void startingDispatch() {
+ // Noop
+
+ }
+
+ public void close() {
+ // Noop this is always transient:
+ }
+
+ public void processForeignUpdates() {
+ requestDispatch();
+ }
+
+ public Dispatchable getDispatchable() {
+ return delegate.getDispatchable();
+ }
+
+ public void requestDispatch() {
+ if (dispatcher.get() == PriorityDispatcher.this) {
+ delegate.requestDispatch();
+ } else {
+ delegate.onForeignThreadUpdate();
+ }
+ }
+
+ public void updatePriority(int priority) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ public String getName() {
+ return delegate.name;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#dispatch(org.apache.activemq
+ * .dispatch.Dispatcher.Dispatchable)
+ */
+ final void dispatch(Dispatchable dispatchable, int priority) {
+ ThreadSafeDispatchContext context = new ThreadSafeDispatchContext(new PriorityDispatchContext(dispatchable, false, name));
+ context.delegate.updatePriority(priority);
+ context.requestDispatch();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+ */
+ public Executor createPriorityExecutor(final int priority) {
+
+ return new Executor() {
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), priority);
+ }
+ };
+ }
+
+ public void execute(final Runnable runnable) {
+ dispatch(new RunnableAdapter(runnable), 0);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+ * long, java.util.concurrent.TimeUnit)
+ */
+ public void schedule(final Runnable runnable, final long delay, final TimeUnit timeUnit) {
+ if (dispatcher.get() == this) {
+ timerHeap.add(runnable, delay, timeUnit);
+ } else {
+ new ForeignEvent() {
+ public void execute() {
+ timerHeap.add(runnable, delay, timeUnit);
+ }
+ }.addToList();
+ }
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ private class TimerHeap {
+
+ final TreeMap<Long, LinkedList<Runnable>> timers = new TreeMap<Long, LinkedList<Runnable>>();
+
+ private void add(Runnable runnable, long delay, TimeUnit timeUnit) {
+
+ long nanoDelay = timeUnit.convert(delay, TimeUnit.NANOSECONDS);
+ long eTime = System.nanoTime() + nanoDelay;
+ LinkedList<Runnable> list = new LinkedList<Runnable>();
+ list.add(runnable);
+
+ LinkedList<Runnable> old = timers.put(eTime, list);
+ if (old != null) {
+ list.addAll(old);
+ }
+ }
+
+ private void executeReadyEvents() {
+ LinkedList<Runnable> ready = null;
+ if (timers.isEmpty()) {
+ return;
+ } else {
+ long now = System.nanoTime();
+ long first = timers.firstKey();
+ if (first > now) {
+ return;
+ }
+ ready = new LinkedList<Runnable>();
+
+ while (first < now) {
+ ready.addAll(timers.remove(first));
+ if (timers.isEmpty()) {
+ break;
+ }
+ first = timers.firstKey();
+
+ }
+ }
+
+ for (Runnable runnable : ready) {
+ try {
+ runnable.run();
+ } catch (Throwable thrown) {
+ thrown.printStackTrace();
+ }
+ }
+ }
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.queue.Mapper;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
+
+public class PriorityLinkedList<E extends LinkedNode<E>> {
+
+ private Mapper<Integer, E> priorityMapper;
+ private final ArrayList<LinkedNodeList<E>> priorityLists;
+ private int highesPriority = 0;
+
+ public PriorityLinkedList(int numPriorities) {
+ this(numPriorities, null);
+ }
+
+ public PriorityLinkedList(int numPriorities, Mapper<Integer, E> priorityMapper) {
+ this.priorityMapper = priorityMapper;
+ priorityLists = new ArrayList<LinkedNodeList<E>>();
+ for (int i = 0; i <= numPriorities; i++) {
+ priorityLists.add(new LinkedNodeList<E>());
+ }
+ }
+
+ public final int getHighestPriority() {
+ return highesPriority;
+ }
+
+ /**
+ * Gets the element at the front of the list:
+ *
+ * @return
+ */
+ public final E poll() {
+ LinkedNodeList<E> ll = getHighestPriorityList();
+ if (ll == null) {
+ return null;
+ }
+ E node = ll.getHead();
+ node.unlink();
+
+ return node;
+ }
+
+ public final boolean isEmpty() {
+ return peek() != null;
+ }
+
+ /**
+ * Gets the element at the front of the list:
+ *
+ * @return
+ */
+ public final E peek() {
+ LinkedNodeList<E> ll = getHighestPriorityList();
+ if (ll == null) {
+ return null;
+ }
+
+ return ll.getHead();
+ }
+
+ public final void add(E element) {
+ int prio = priorityMapper.map(element);
+ add(element, prio);
+ }
+
+ public final void add(E element, int prio) {
+ LinkedNodeList<E> ll = priorityLists.get(prio);
+ ll.addLast(element);
+ if (prio > highesPriority) {
+ highesPriority = prio;
+ }
+ }
+
+ private final LinkedNodeList<E> getHighestPriorityList() {
+ LinkedNodeList<E> ll = priorityLists.get(highesPriority);
+ while (ll.isEmpty()) {
+ if (highesPriority == 0) {
+ return null;
+ }
+ highesPriority--;
+ ll = priorityLists.get(highesPriority);
+ }
+
+ return ll;
+ }
+
+ public Mapper<Integer, E> getPriorityMapper() {
+ return priorityMapper;
+ }
+
+ public void setPriorityMapper(Mapper<Integer, E> priorityMapper) {
+ this.priorityMapper = priorityMapper;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.Arrays;
+
+public class PriorityMap<E> {
+
+ int first;
+ int base;
+ int size;
+
+ Object elements[] = new Object[1];
+
+ public E put(int key, E value) {
+ E rc = null;
+ if (isEmpty()) {
+ // This will be the first base prioritu..
+ base = key;
+ elements[0] = value;
+ first = 0;
+ } else {
+ if (key > base) {
+ // New priority is after the current base, we may need to
+ // expaned the
+ // priority array to fit this new one in.
+ int index = key - base;
+ if (elements.length <= index) {
+ // The funky thing is if the original base was removed,
+ // resizing
+ // will rebase the at the first.
+ resize(index + 1, 0);
+ }
+ if (index < first) {
+ first = index;
+ }
+ rc = element(index);
+ elements[index] = value;
+ } else {
+ // Ok this element is before the current base so we need to
+ // resize/rebase
+ // using this element as the base.
+ int oldLastIndex = indexOfLast();
+ int newLastIndex = (base + oldLastIndex) - key;
+ resize(newLastIndex + 1, first + (base - key), (oldLastIndex - first) + 1);
+ elements[0] = value;
+ first = 0;
+ }
+ }
+ if (rc == null) {
+ size++;
+ }
+ return rc;
+ }
+
+ private int indexOfLast() {
+ int i = elements.length - 1;
+ while (i >= 0) {
+ if (elements[i] != null) {
+ return i;
+ }
+ i--;
+ }
+ return -1;
+ }
+
+ private void resize(int newSize, int firstOffset) {
+ int count = Math.min(elements.length - first, newSize);
+ resize(newSize, firstOffset, count);
+ }
+
+ private void resize(int newSize, int firstOffset, int copyCount) {
+ Object t[];
+ if (elements.length == newSize) {
+ t = elements;
+ System.arraycopy(elements, first, t, firstOffset, copyCount);
+ Arrays.fill(t, 0, firstOffset, null);
+ } else {
+ t = new Object[newSize];
+ System.arraycopy(elements, first, t, firstOffset, copyCount);
+ }
+ base += (first - firstOffset);
+ elements = t;
+ }
+
+ public E get(int priority) {
+ int index = priority - base;
+ if (index < 0 || index >= elements.length) {
+ return null;
+ }
+ return element(index);
+ }
+
+ @SuppressWarnings("unchecked")
+ private E element(int index) {
+ return (E) elements[index];
+ }
+
+ public E remove(int priority) {
+ int index = priority - base;
+ if (index < 0 || index >= elements.length) {
+ return null;
+ }
+ E rc = element(index);
+ elements[index] = null;
+ if (rc != null) {
+ size--;
+ }
+ return rc;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public E firstValue() {
+ if (size == 0) {
+ return null;
+ }
+ E rc = element(first);
+ while (rc == null) {
+ // The first element may have been removed so we need to find it...
+ first++;
+ rc = element(first);
+ }
+ return (E) rc;
+ }
+
+ public Integer firstKey() {
+ if (size == 0) {
+ return null;
+ }
+ E rc = element(first);
+ while (rc == null) {
+ // The first element may have been removed so we need to find it...
+ first++;
+ rc = element(first);
+ }
+ return first;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityPooledDispatcher.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PriorityPooledDispatcher implements IDispatcher {
+ private final String name;
+
+ final AtomicBoolean started = new AtomicBoolean();
+ final AtomicBoolean shutdown = new AtomicBoolean();
+
+ ArrayList<PriorityDispatcher> dispatchers = new ArrayList<PriorityDispatcher>();
+ private int roundRobinCounter = 0;
+ private final int size;
+
+ private final SimpleLoadBalancer executionGraphLoadBalancer;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#createPriorityExecutor(int)
+ */
+ public Executor createPriorityExecutor(final int priority) {
+ return new Executor() {
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
+ }
+ };
+ }
+
+ public PriorityPooledDispatcher(String name, int size, int priorities) {
+ this.name = name;
+ this.size = size;
+ executionGraphLoadBalancer = new SimpleLoadBalancer(name);
+ // Create all the workers.
+ for (int i = 0; i < size; i++) {
+ PriorityDispatcher dispatcher = new PriorityDispatcher(name + "-" + (i + 1), priorities, executionGraphLoadBalancer);
+ dispatchers.add(dispatcher);
+ }
+ }
+
+ public DispatchContext register(Dispatchable dispatchable, String name) {
+ return chooseDispatcher().register(dispatchable, name);
+ }
+
+ /**
+ * @see org.apache.activemq.dispatch.IDispatcher#start()
+ */
+ public synchronized final void start() {
+ if (started.compareAndSet(false, true)) {
+ // Create all the workers.
+ for (int i = 0; i < size; i++) {
+ dispatchers.get(i).start();
+ }
+ }
+ try {
+ executionGraphLoadBalancer.start();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
+ */
+ public synchronized final void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ for (PriorityDispatcher dispatcher : dispatchers) {
+ dispatcher.shutdown();
+ }
+ executionGraphLoadBalancer.shutdown();
+ }
+
+ private PriorityDispatcher chooseDispatcher() {
+ PriorityDispatcher d = PriorityDispatcher.dispatcher.get();
+ if (d == null) {
+ synchronized (dispatchers) {
+ if (++roundRobinCounter >= size) {
+ roundRobinCounter = 0;
+ }
+ return dispatchers.get(roundRobinCounter);
+ }
+ } else {
+ return d;
+ }
+ }
+
+ public void execute(final Runnable runnable) {
+ chooseDispatcher().dispatch(new RunnableAdapter(runnable), 0);
+ }
+
+ // TODO Implement
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.dispatch.IDispatcher#schedule(java.lang.Runnable,
+ * long, java.util.concurrent.TimeUnit)
+ */
+ public void schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
+ chooseDispatcher().schedule(runnable, delay, timeUnit);
+ }
+
+ public String toString() {
+ return name;
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.dispatch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+
+/**
+ *
+ */
+public class SimpleLoadBalancer implements ExecutionLoadBalancer {
+
+ private static final ThreadLocal<ExecutionGraphNode> dispatchContext = new ThreadLocal<ExecutionGraphNode>();
+ private static final ThreadLocal<PoolableDispatcher> dispatcher = new ThreadLocal<PoolableDispatcher>();
+
+ private final ArrayList<PoolableDispatcher> dispatchers = new ArrayList<PoolableDispatcher>();
+
+ private final String name;
+ private final boolean DEBUG = false;
+
+ SimpleLoadBalancer(String name) {
+ this.name = name;
+ }
+
+ public void start() throws InterruptedException {
+ }
+
+ public void shutdown() {
+ }
+
+ public LoadBalancedDispatchContext createLoadBalancedDispatchContext(PoolableDispatchContext context) {
+ ExecutionGraphNode egn = new ExecutionGraphNode(context);
+ return egn;
+ }
+
+ public synchronized final void addDispatcher(PoolableDispatcher dispatcher) {
+ dispatchers.add(dispatcher);
+ }
+
+ /**
+ * A Dispatcher must call this to indicate that is has started it's dispatch
+ * loop.
+ */
+ public void onDispatcherStarted(PoolableDispatcher d) {
+ dispatcher.set(d);
+ }
+
+ /**
+ * A Dispatcher must call this when exiting it's dispatch loop
+ */
+ public void onDispatcherStopped(PoolableDispatcher d) {
+
+ }
+
+ private class ExecutionGraphEdge {
+ final ExecutionGraphNode target;
+ final ExecutionGraphNode source;
+ int count;
+
+ ExecutionGraphEdge(ExecutionGraphNode source, ExecutionGraphNode target) {
+ this.target = target;
+ this.source = source;
+ }
+
+ public String toString() {
+ return "Connection from: " + source + " to " + target;
+ }
+ }
+
+ /**
+ * ExecutionGraphNode tracks dispatch information for a
+ * MappableDispatchContext.
+ *
+ */
+ public class ExecutionGraphNode implements LoadBalancedDispatchContext {
+ protected PoolableDispatchContext context;
+ private ExecutionGraphNode singleSource;
+ private final HashMap<ExecutionGraphNode, ExecutionGraphEdge> sources = new HashMap<ExecutionGraphNode, ExecutionGraphEdge>();
+ protected PoolableDispatcher currentOwner;
+ private final AtomicInteger work = new AtomicInteger(0);
+
+ private int priority;
+ private boolean dispatchRequested = false;
+ private PoolableDispatcher updateDispatcher = null;
+
+ ExecutionGraphNode(PoolableDispatchContext context) {
+ this.context = context;
+ this.context.setLoadBalancedDispatchContext(this);
+ this.currentOwner = context.getDispatcher();
+ if (DEBUG) {
+ System.out.println(getName() + " Assigned to " + context.getDispatcher());
+ }
+ }
+
+ public final void startingDispatch() {
+ dispatchContext.set(this);
+ }
+
+ public final void finishedDispatch() {
+ dispatchContext.set(null);
+ }
+
+ /**
+ * This method is called to track which dispatch contexts are requesting
+ * dispatch for the target context represented by this node.
+ *
+ * This method is not threadsafe, the caller must ensure serialized
+ * access to this method.
+ *
+ * @param callingDispatcher
+ * The calling dispatcher.
+ * @return True if this method resulted in the dispatch request being
+ * assigned to another dispatcher.
+ */
+ public final boolean onDispatchRequest(final PoolableDispatcher callingDispatcher) {
+
+ /*
+ * if (callingDispatcher == currentOwner) { return false; }
+ */
+
+ ExecutionGraphNode callingContext = dispatchContext.get();
+ if (callingContext != null) {
+ // Make sure we are being called by another node:
+ if (callingContext == null || callingContext == context) {
+ return false;
+ }
+
+ // Optimize for single source case:
+ if (singleSource != callingContext) {
+ if (singleSource == null && sources.isEmpty()) {
+ singleSource = callingContext;
+ ExecutionGraphEdge edge = new ExecutionGraphEdge(callingContext, this);
+ sources.put(callingContext, edge);
+
+ // If this context only has a single source
+ // immediately assign it to the
+ // dispatcher of the source:
+ boolean reassigned = false;
+ synchronized (this) {
+ if (callingDispatcher != currentOwner && updateDispatcher == null) {
+ updateDispatcher = callingDispatcher;
+ reassigned = true;
+ if (DEBUG)
+ System.out.println("Assigning: " + this + " to " + callingContext + "'s dispatcher: " + callingDispatcher);
+
+ }
+ }
+ if (reassigned) {
+ assignToNewDispatcher(callingDispatcher);
+ }
+ return true;
+ } else {
+
+ ExecutionGraphEdge stats = sources.get(callingContext);
+ if (stats == null) {
+ stats = new ExecutionGraphEdge(callingContext, this);
+ sources.put(callingContext, stats);
+ }
+
+ if (singleSource != null) {
+ singleSource = null;
+ }
+ }
+ }
+ work.incrementAndGet();
+ }
+ return false;
+ }
+
+ final void assignToNewDispatcher(PoolableDispatcher newDispatcher) {
+ synchronized (this) {
+ if (newDispatcher != currentOwner) {
+ updateDispatcher = newDispatcher;
+ }
+ }
+ context.onForeignThreadUpdate();
+ }
+
+ public void requestDispatch() {
+
+ PoolableDispatcher callingDispatcher = dispatcher.get();
+
+ if (onDispatchRequest(callingDispatcher)) {
+ return;
+ }
+
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize
+ // to protect against ownership changes:
+ synchronized (this) {
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+
+ context.requestDispatch();
+ return;
+ }
+
+ dispatchRequested = true;
+ }
+ context.onForeignThreadUpdate();
+ }
+
+ public void updatePriority(int priority) {
+ if (this.priority == priority) {
+ return;
+ }
+ // Otherwise this is coming off another thread, so we need to
+ // synchronize
+ // to protect against ownership changes:
+ synchronized (this) {
+ this.priority = priority;
+
+ IDispatcher callingDispatcher = dispatcher.get();
+
+ // If the owner of this context is the calling thread, then
+ // delegate to the dispatcher.
+ if (currentOwner == callingDispatcher) {
+
+ context.updatePriority(priority);
+ return;
+ }
+ }
+ context.onForeignThreadUpdate();
+ }
+
+ public void processForeignUpdates() {
+ boolean ownerChange = false;
+ synchronized (this) {
+ if (updateDispatcher != null) {
+ // Close the old context:
+ if (DEBUG) {
+ System.out.println("Assigning " + getName() + " to " + updateDispatcher);
+ }
+ context.close();
+
+ currentOwner = updateDispatcher;
+ updateDispatcher = null;
+ context = currentOwner.createPoolablDispatchContext(context.getDispatchable(), context.getName());
+ dispatchRequested = true;
+ context.updatePriority(priority);
+ context.setLoadBalancedDispatchContext(this);
+ ownerChange = true;
+ } else {
+ context.updatePriority(priority);
+
+ if (dispatchRequested) {
+ context.requestDispatch();
+ dispatchRequested = false;
+ }
+ }
+ }
+
+ if (ownerChange) {
+ context.onForeignThreadUpdate();
+ }
+ }
+
+ public void close() {
+ sources.clear();
+ }
+
+ public final String toString() {
+ return context.toString();
+ }
+
+ public Dispatchable getDispatchable() {
+ return context.getDispatchable();
+ }
+
+ public String getName() {
+ return context.getName();
+ }
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+public abstract class AbstractLimitedFlowResource<E> implements IFlowResource {
+ private final HashSet<FlowLifeCycleListener> lifeCycleWatchers = new HashSet<FlowLifeCycleListener>();
+ private final HashMap<Flow, FlowController<E>> openControllers = new HashMap<Flow, FlowController<E>>();
+
+ private final long resourceId = RESOURCE_COUNTER.incrementAndGet();
+
+ private String resourceName;
+
+ protected AbstractLimitedFlowResource() {
+
+ }
+
+ protected AbstractLimitedFlowResource(String name) {
+ this.resourceName = name;
+ }
+
+ public long getResourceId() {
+ return resourceId;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ protected void setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ }
+
+ public synchronized final void addFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ lifeCycleWatchers.add(listener);
+ // Notify the watchers of all flows that are already open:
+ for (FlowController<E> controller : openControllers.values()) {
+ listener.onFlowOpened(this, controller.getFlow());
+ }
+ }
+
+ /**
+ * Subclasses must call this whenever a new {@link ISinkController} is
+ * opened.
+ *
+ * @param controller
+ * The new controller.
+ */
+ protected synchronized final void onFlowOpened(FlowController<E> controller) {
+ FlowController<E> existing = openControllers.put(controller.getFlow(), controller);
+ if (existing != null && existing != controller) {
+ // Put the existing controller back:
+ openControllers.put(controller.getFlow(), existing);
+ throw new IllegalStateException("Flow already opened" + existing);
+ }
+
+ for (FlowLifeCycleListener listener : lifeCycleWatchers) {
+ listener.onFlowOpened(this, controller.getFlow());
+ }
+ }
+
+ protected synchronized final void onFlowClosed(Flow flow) {
+ FlowController<E> existing = openControllers.remove(flow);
+
+ if (existing != null) {
+ for (FlowLifeCycleListener listener : lifeCycleWatchers) {
+ listener.onFlowClosed(this, existing.getFlow());
+ }
+ }
+ }
+
+ public synchronized void removeFlowLifeCycleListener(FlowLifeCycleListener listener) {
+ lifeCycleWatchers.remove(listener);
+ }
+
+ /**
+ * Gets the flow controller corresponding to the specified flow.
+ *
+ * @param flow
+ * The flow
+ * @return The FlowController
+ */
+ public synchronized FlowController<E> getFlowController(Flow flow) {
+ return openControllers.get(flow);
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowSource.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+public abstract class AbstractLimitedFlowSource<E> extends AbstractLimitedFlowResource<E> {
+
+ protected boolean autoRelease = false;
+ protected IFlowDrain<E> drain;
+
+ protected AbstractLimitedFlowSource() {
+ super();
+ }
+
+ protected AbstractLimitedFlowSource(String name) {
+ super(name);
+ }
+
+ /**
+ * Can be set to automatically release space on dequeue. When set the caller
+ * does not need to call IFlowController.elementDispatched()
+ *
+ * @param val
+ */
+ public synchronized void setAutoRelease(boolean val) {
+ autoRelease = val;
+ }
+
+ /**
+ * Returns whether or not this {@link IFlowSource} is set to automatically
+ * release elements via {@link FlowController#elementDispatched(Object)
+ * during dispatch. When auto release is set the caller <i>must</i> not call
+ * {@link FlowController#elementDispatched(Object).
+ */
+ public synchronized boolean getAutoRelease() {
+ return autoRelease;
+ }
+
+ public synchronized void setDrain(IFlowDrain<E> drain) {
+
+ this.drain = drain;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.LinkedList;
+
+public abstract class AbstractLimiter<E> implements IFlowLimiter<E> {
+
+ private LinkedList<UnThrottleListener> throttleListeners = new LinkedList<UnThrottleListener>();
+ private boolean resuming;
+
+ public AbstractLimiter() {
+ }
+
+ public final void addUnThrottleListener(UnThrottleListener l) {
+ throttleListeners.add(l);
+
+ if (!resuming && !getThrottled()) {
+ notifyUnThrottleListeners();
+ }
+ }
+
+ public final void notifyUnThrottleListeners() {
+ resuming = true;
+ while (!getThrottled() && !throttleListeners.isEmpty()) {
+ UnThrottleListener l = throttleListeners.remove();
+ l.onUnthrottled();
+ }
+ resuming = false;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/Flow.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ */
+final public class Flow {
+ static final private AtomicLong FLOW_COUNTER = new AtomicLong();
+
+ final private long flowID;
+ final private String flowName;
+ final private boolean dynamic;
+ final private int hashCode;
+
+ /**
+ * Package scoped constructor.
+ *
+ * @param name
+ * The flow name.
+ * @param id
+ * The flow id.
+ */
+ public Flow(String name, boolean dynamic) {
+ this.flowID = FLOW_COUNTER.incrementAndGet();
+ this.flowName = name;
+ this.dynamic = dynamic;
+ this.hashCode = (int) (flowID ^ (flowID >>> 32));
+ }
+
+ /**
+ * @see org.apache.activemq.flow.Flow#getFlowID()
+ */
+ public long getFlowID() {
+ return flowID;
+ }
+
+ /**
+ * @see Flow#getFlowName()
+ */
+ public String getFlowName() {
+ return flowName;
+ }
+
+ /**
+ * @see Flow#isDynamic()
+ */
+ public boolean isDynamic() {
+ return dynamic;
+ }
+
+ public int hashCode() {
+ return hashCode;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o instanceof Flow) {
+ return equals((Flow) o);
+ }
+
+ return false;
+ }
+
+ public boolean equals(Flow flow) {
+ return flowID == flow.getFlowID();
+ }
+
+ public String toString() {
+ return getFlowName();
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import org.apache.activemq.flow.IFlowLimiter.UnThrottleListener;
+
+/**
+ */
+public class FlowController<E> implements ISinkController<E>, ISourceController<E> {
+
+ // Sinks that are blocking us.
+ private final HashSet<ISinkController<E>> blockingSinks = new HashSet<ISinkController<E>>();
+
+ // Holds the sources that this limiter is currently blocking
+ private final HashSet<ISourceController<E>> blockedSources = new HashSet<ISourceController<E>>();
+
+ // Holds the sources that this limiter is currently blocking
+ private final HashSet<FlowUnblockListener<E>> unblockListeners = new HashSet<FlowUnblockListener<E>>();
+
+ // Callback for the IFlowLimiter to notify us that it is unthrottled:
+ private final UnThrottleListener unthrottleListener;
+
+ // The flow being flow controlled:
+ protected Flow flow;
+
+ // Used to synchronize access to this controller by downstream sinks:
+ // private final ReentrantLock sinkLock = new
+ // java.util.concurrent.locks.ReentrantLock();
+
+ // True if the flow is blocked:
+ protected boolean blocked = false;
+
+ // Set to true while resuming. New Sources must wait to enqueue while
+ // old sources are being resumed.
+ private boolean resuming = false;
+
+ // Marks that we have scheduled a resume
+ private boolean resumeScheduled = false;
+
+ // The acceptor for elements from this flow.
+ private FlowControllable<E> controllable;
+
+ // The limiter
+ private IFlowLimiter<E> limiter;
+
+ // Mutex for synchronization
+ private Object mutex;
+
+ private boolean useOverFlowQueue = true;
+
+ // List of elements that were added while the flow is blocked.
+ // These aren't added to the resource until the flow becomes
+ // unblocked.
+ private LinkedList<E> overflowQueue = new LinkedList<E>();
+
+ // true we registered as an unthrottle listener:
+ private boolean throttleReg;
+ private boolean notifyUnblock = false;
+ private String name;
+
+ public FlowController() {
+ this.unthrottleListener = new UnThrottleListener() {
+ public final void onUnthrottled() {
+ FlowController.this.onUnthrottled();
+ }
+ };
+ }
+
+ public FlowController(FlowControllable<E> controllable, Flow flow, IFlowLimiter<E> limiter, Object mutex) {
+ this();
+ this.controllable = controllable;
+ this.flow = flow;
+ this.limiter = limiter == null ? new SizeLimiter<E>(0, 0) : limiter;
+ this.mutex = mutex;
+ this.name = controllable.getFlowSource().toString();
+ }
+
+ public final IFlowLimiter<E> getLimiter() {
+ return limiter;
+ }
+
+ public final void setLimiter(IFlowLimiter<E> limiter) {
+ synchronized (mutex) {
+ this.limiter = limiter;
+ onUnthrottled();
+ }
+ }
+
+ /**
+ * Sets whether the controller uses an overflow queue to prevent overflow.
+ */
+ public final void useOverFlowQueue(boolean val) {
+ useOverFlowQueue = val;
+ }
+
+ public final Flow getFlow() {
+ return flow;
+ }
+
+ /**
+ * Should be called by a resource anytime it's limits are exceeded.
+ */
+ public final void onFlowBlock(ISinkController<E> sinkController) {
+ synchronized (mutex) {
+ if (!blockingSinks.add(sinkController)) {
+ throw new IllegalStateException(sinkController + " has already blocked: " + this);
+ }
+
+ if (!blocked) {
+ blocked = true;
+ // System.out.println(this + " BLOCKED");
+ }
+ }
+ }
+
+ public final void onFlowResume(ISinkController<E> sinkController) {
+ synchronized (mutex) {
+ if (!blockingSinks.remove(sinkController)) {
+ throw new IllegalStateException(sinkController + " can't resume unblocked " + this);
+ }
+
+ if (blockingSinks.isEmpty()) {
+ if (blocked) {
+ blocked = false;
+ limiter.releaseReserved();
+ }
+ }
+ }
+ }
+
+ /**
+ * Must be called once the elements have been sent to downstream sinks.
+ *
+ * @param elem
+ * The dispatched element.
+ * @return
+ */
+ public final void elementDispatched(E elem) {
+
+ synchronized (mutex) {
+ // If we were blocked in the course of dispatching the message don't
+ // decrement
+ // the limiter space:
+ if (blocked) {
+ limiter.reserve(elem);
+ return;
+ }
+ limiter.remove(elem);
+ }
+ }
+
+ public final boolean isSinkBlocked() {
+ synchronized (mutex) {
+ return limiter.getThrottled();
+ }
+ }
+
+ public final boolean isSourceBlocked() {
+ synchronized (mutex) {
+ return blocked;
+ }
+ }
+
+ /**
+ * Waits for a flow to become unblocked.
+ *
+ * @param flow
+ * The flow.
+ * @throws InterruptedException
+ * If interrupted while waiting.
+ */
+ public void waitForFlowUnblock() throws InterruptedException {
+ synchronized (mutex) {
+ while (limiter.getThrottled()) {
+ notifyUnblock = true;
+ setUnThrottleListener();
+ mutex.wait();
+ }
+ notifyUnblock = false;
+ }
+ }
+
+ public IFlowSource<E> getFlowSource() {
+ return controllable.getFlowSource();
+ }
+
+ /**
+ * Adds an element to the sink associated with this resource if space is
+ * available. If no space is available the source controller will be
+ * blocked, and the source is responsible for tracking the space until this
+ * controller resumes.
+ *
+ * @param elem
+ * The element to add.
+ * @param controller
+ * the source flow controller.
+ */
+ public void add(E elem, ISourceController<E> sourceController) {
+ boolean ok = false;
+ synchronized (mutex) {
+ // If we don't have an fc sink, then just increment the limiter.
+ if (controllable == null) {
+ limiter.add(elem);
+ return;
+ }
+ if (okToAdd(elem)) {
+ ok = true;
+ if (limiter.add(elem)) {
+ blockSource(sourceController);
+ setUnThrottleListener();
+ }
+ } else {
+ // Add to overflow queue and block source:
+ overflowQueue.add(elem);
+ blockSource(sourceController);
+ setUnThrottleListener();
+ }
+ }
+ if (ok) {
+ controllable.flowElemAccepted(this, elem);
+ }
+ }
+
+ /**
+ * Offers an element to the sink associated with this resource if space is
+ * available. If no space is available false is returned. The element does
+ * not get added to the overflow list.
+ *
+ * @param elem
+ * The element to add.
+ * @param controller
+ * the source flow controller.
+ */
+ public boolean offer(E elem, ISourceController<E> sourceController) {
+ synchronized (mutex) {
+ // If we don't have an fc sink, then just increment the limiter.
+ if (controllable == null) {
+ limiter.add(elem);
+ return true;
+ }
+
+ if (okToAdd(elem)) {
+ if (limiter.add(elem)) {
+ blockSource(sourceController);
+ setUnThrottleListener();
+ }
+ controllable.flowElemAccepted(this, elem);
+ return true;
+ } else {
+ blockSource(sourceController);
+ setUnThrottleListener();
+ return false;
+ }
+ }
+ }
+
+ private boolean okToAdd(E elem) {
+ return !useOverFlowQueue || limiter.canAdd(elem);
+ }
+
+ private void addToResource(E elem) {
+ if (limiter != null)
+ limiter.add(elem);
+ if (controllable != null)
+ controllable.flowElemAccepted(this, elem);
+ }
+
+ private void setUnThrottleListener() {
+ if (!throttleReg) {
+ throttleReg = true;
+ limiter.addUnThrottleListener(unthrottleListener);
+ }
+ }
+
+ public void onUnthrottled() {
+ synchronized (mutex) {
+ throttleReg = false;
+ dispatchOverflowQueue();
+ }
+ }
+
+ /**
+ * Blocks a source.
+ *
+ * @param source
+ * The {@link ISinkController} of the source to be blocked.
+ */
+ protected void blockSource(final ISourceController<E> source) {
+ if (source == null) {
+ return;
+ }
+
+ // If we are currently in the process of resuming we
+ // must wait for resume to complete, before we add to
+ // the blocked list:
+ waitForResume();
+
+ if (!blockedSources.contains(source)) {
+ // System.out.println("BLOCKING : SINK["+this + "], SOURCE[" +
+ // source+"]");
+ blockedSources.add(source);
+ source.onFlowBlock(this);
+ }
+ }
+
+ private void dispatchOverflowQueue() {
+
+ // Dispatch elements on the blocked list into the limited resource
+ while (!overflowQueue.isEmpty()) {
+ E elem = overflowQueue.getFirst();
+ if (limiter.canAdd(elem)) {
+ overflowQueue.removeFirst();
+ addToResource(elem);
+ } else {
+ break;
+ }
+ }
+
+ // See if we can now unblock the sources.
+ checkUnblockSources();
+
+ // If we've exceeded the the throttle threshold, register
+ // a listener so we can resume the blocked sources after
+ // the limiter falls below the threshold:
+ if (!overflowQueue.isEmpty()) {
+ setUnThrottleListener();
+ } else if (notifyUnblock) {
+ mutex.notifyAll();
+ }
+ }
+
+ /**
+ * Called to wait for a flow to become unblocked.
+ *
+ * @param listener
+ * The listener.
+ * @return true;
+ */
+ public final boolean addUnblockListener(FlowUnblockListener<E> listener) {
+ synchronized (mutex) {
+ waitForResume();
+ if (limiter.getThrottled() || !overflowQueue.isEmpty()) {
+ unblockListeners.add(listener);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private final void waitForResume() {
+ boolean interrupted = false;
+ while (resuming) {
+ try {
+ mutex.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Releases blocked sources providing the limiter isn't throttled, and there
+ * are no elements on the blocked list.
+ */
+ private void checkUnblockSources() {
+ if (!resumeScheduled && !limiter.getThrottled() && overflowQueue.isEmpty() && (!blockedSources.isEmpty() || !unblockListeners.isEmpty())) {
+ resumeScheduled = true;
+ Runnable resume = new Runnable() {
+ public void run() {
+ synchronized (mutex) {
+ resuming = true;
+ }
+ String was = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName(name);
+ for (ISourceController<E> source : blockedSources) {
+ // System.out.println("UNBLOCKING: SINK["+FlowController.this
+ // + "], SOURCE[" + source+"]");
+ source.onFlowResume(FlowController.this);
+ }
+ for (FlowUnblockListener<E> listener : unblockListeners) {
+ // System.out.println(this + "Unblocking source " +
+ // source );
+ listener.onFlowUnblocked(FlowController.this);
+ }
+
+ } finally {
+ synchronized (mutex) {
+ blockedSources.clear();
+ unblockListeners.clear();
+ resuming = false;
+ resumeScheduled = false;
+ mutex.notifyAll();
+ }
+ Thread.currentThread().setName(was);
+ }
+ }
+ };
+
+ RESUME_SERVICE.execute(resume);
+ }
+ }
+
+ private static Executor RESUME_SERVICE = Executors.newCachedThreadPool();
+
+ public static final void setFlowExecutor(Executor executor) {
+ RESUME_SERVICE = executor;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public IFlowSink<E> getFlowSink() {
+ return controllable.getFlowSink();
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java?rev=743476&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowDrain.java Wed Feb 11 20:12:28 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.flow;
+
+/**
+ * Defines an interface for draining a flow source.
+ *
+ * @param <E>
+ */
+public interface IFlowDrain<E> {
+
+ /**
+ * Used by a FlowSource that is being dispatched to drain it's elements.
+ * The implementor is responsible for calling {@link ISourceController#elementDispatched(Object)
+ * when the element has been dispatched to all downstream sinks if IFlowSource
+ * @param elem
+ * @param controller
+ */
+ public void drain(E elem, ISourceController<E> controller);
+}