You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/20 21:33:27 UTC
svn commit: r746363 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/
test/java/org/apache/activemq/flow/
Author: chirino
Date: Fri Feb 20 20:33:26 2009
New Revision: 746363
URL: http://svn.apache.org/viewvc?rev=746363&view=rev
Log:
Applying colin's https://issues.apache.org/activemq/browse/AMQ-2132 patch.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java Fri Feb 20 20:33:26 2009
@@ -1,12 +1,12 @@
package org.apache.activemq.dispatch;
import java.util.ArrayList;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
public abstract class AbstractPooledDispatcher<D extends IDispatcher> implements IDispatcher, PooledDispatcher<D> {
-
+
private final String name;
private final ThreadLocal<D> dispatcher = new ThreadLocal<D>();
@@ -17,7 +17,7 @@
final AtomicBoolean shutdown = new AtomicBoolean();
private int roundRobinCounter = 0;
- private final int size;
+ private int size;
protected ExecutionLoadBalancer<D> loadBalancer;
@@ -73,8 +73,6 @@
interrupted = true;
continue;
}
- dispatchers.remove(dispatchers.size() - 1);
-
}
// Re-interrupt:
if (interrupted) {
@@ -119,6 +117,11 @@
* A Dispatcher must call this when exiting it's dispatch loop
*/
public void onDispatcherStopped(D d) {
+ synchronized (dispatchers) {
+ if (dispatchers.remove(d)) {
+ size--;
+ }
+ }
loadBalancer.removeDispatcher(d);
}
@@ -126,6 +129,10 @@
D d = dispatcher.get();
if (d == null) {
synchronized (dispatchers) {
+ if(dispatchers.isEmpty())
+ {
+ throw new RejectedExecutionException();
+ }
if (++roundRobinCounter >= size) {
roundRobinCounter = 0;
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Fri Feb 20 20:33:26 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.dispatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
public interface IDispatcher {
@@ -48,8 +49,10 @@
* dispatch. The {@link Dispatchable} will remain in the dispatch queue
* until a subsequent call to {@link Dispatchable#dispatch()} returns
* false;
+ *
+ * @throws RejectedExecutionException If the dispatcher has been shutdown.
*/
- public void requestDispatch();
+ public void requestDispatch() throws RejectedExecutionException;
/**
* This can be called to update the dispatch priority.
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri Feb 20 20:33:26 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.dispatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -370,7 +371,7 @@
final UpdateEvent updateEvent[];
private final ExecutionTracker<D> tracker;
- private D currentOwner;
+ protected D currentOwner;
private D updateDispatcher = null;
private int priority;
@@ -433,6 +434,9 @@
public void requestDispatch() {
+ if (closed) {
+ throw new RejectedExecutionException();
+ }
D callingDispatcher = getCurrentDispatcher();
if (tracker != null)
tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
@@ -536,17 +540,17 @@
}
public void close() {
- tracker.close();
D callingDispatcher = getCurrentDispatcher();
synchronized (this) {
closed = true;
-
// If the owner of this context is the calling thread, then
// delegate to the dispatcher.
if (currentOwner == callingDispatcher) {
if (isLinked()) {
unlink();
}
+ tracker.close();
+
// FIXME Deadlock potential!
synchronized (foreignQueue) {
if (updateEvent[foreignToggle].isLinked()) {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Fri Feb 20 20:33:26 2009
@@ -20,6 +20,7 @@
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.activemq.flow.IFlowLimiter.UnThrottleListener;
@@ -270,7 +271,7 @@
setUnThrottleListener();
}
}
- if( ok ) {
+ if (ok) {
controllable.flowElemAccepted(this, elem);
}
return ok;
@@ -318,7 +319,8 @@
waitForResume();
if (!blockedSources.contains(source)) {
-// System.out.println("BLOCKING : SINK[" + this + "], SOURCE[" + source + "]");
+ // System.out.println("BLOCKING : SINK[" + this + "], SOURCE[" +
+ // source + "]");
blockedSources.add(source);
source.onFlowBlock(this);
}
@@ -399,7 +401,9 @@
try {
Thread.currentThread().setName(name);
for (ISourceController<E> source : blockedSources) {
-// System.out.println("UNBLOCKING: SINK[" + FlowController.this + "], SOURCE[" + source + "]");
+ // System.out.println("UNBLOCKING: SINK[" +
+ // FlowController.this + "], SOURCE[" + source +
+ // "]");
source.onFlowResume(FlowController.this);
}
for (FlowUnblockListener<E> listener : unblockListeners) {
@@ -421,7 +425,12 @@
}
};
- RESUME_SERVICE.execute(resume);
+ try {
+ RESUME_SERVICE.execute(resume);
+ } catch (RejectedExecutionException ree) {
+ // Must be shutting down, ignore this, leaving resumeScheduled
+ // true
+ }
}
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 20 20:33:26 2009
@@ -55,7 +55,7 @@
protected boolean tcp = false;
// set to force marshalling even in the NON tcp case.
protected boolean forceMarshalling = false;
-
+
protected String sendBrokerURI;
protected String receiveBrokerURI;
@@ -91,13 +91,13 @@
@Override
protected void setUp() throws Exception {
- dispatcher = PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
-
- if( tcp ) {
+ dispatcher = createDispatcher();
+ dispatcher.start();
+ if (tcp) {
sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
} else {
- if( forceMarshalling ) {
+ if (forceMarshalling) {
sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
} else {
@@ -106,7 +106,11 @@
}
}
}
-
+
+ protected IDispatcher createDispatcher() {
+ return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
+ }
+
public void test_1_1_0() throws Exception {
producerCount = 1;
destCount = 1;
@@ -121,7 +125,7 @@
stopServices();
}
}
-
+
public void test_1_1_1() throws Exception {
producerCount = 1;
destCount = 1;
@@ -264,7 +268,7 @@
stopServices();
}
}
-
+
/**
* Test sending with 1 high priority sender. The high priority sender should
* have higher throughput than the other low priority senders.
@@ -342,9 +346,9 @@
stopServices();
}
}
-
+
private void reportRates() throws InterruptedException {
- System.out.println("Checking rates for test: " + getName()+", "+(ptp?"ptp":"topic"));
+ System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
Period p = new Period();
Thread.sleep(1000 * 5);
@@ -390,7 +394,7 @@
RemoteProducer producer = createProducer(i, destination);
sendBroker.producers.add(producer);
}
-
+
for (int i = 0; i < consumerCount; i++) {
Destination destination = dests[i % destCount];
RemoteConsumer consumer = createConsumer(i, destination);
@@ -398,18 +402,18 @@
}
// Create MultiBroker connections:
-// if (multibroker) {
-// Pipe<Message> pipe = new Pipe<Message>();
-// sendBroker.createBrokerConnection(rcvBroker, pipe);
-// rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
-// }
+ // if (multibroker) {
+ // Pipe<Message> pipe = new Pipe<Message>();
+ // sendBroker.createBrokerConnection(rcvBroker, pipe);
+ // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+ // }
}
private RemoteConsumer createConsumer(int i, Destination destination) {
RemoteConsumer consumer = new RemoteConsumer();
consumer.setBroker(rcvBroker);
consumer.setDestination(destination);
- consumer.setName("consumer"+(i+1));
+ consumer.setName("consumer" + (i + 1));
consumer.setTotalConsumerRate(totalConsumerRate);
consumer.setDispatcher(dispatcher);
return consumer;
@@ -418,8 +422,8 @@
private RemoteProducer createProducer(int id, Destination destination) {
RemoteProducer producer = new RemoteProducer();
producer.setBroker(sendBroker);
- producer.setProducerId(id+1);
- producer.setName("producer" +(id+1));
+ producer.setProducerId(id + 1);
+ producer.setName("producer" + (id + 1));
producer.setDestination(destination);
producer.setMessageIdGenerator(msgIdGenerator);
producer.setTotalProducerRate(totalProducerRate);
@@ -432,7 +436,7 @@
queue.setBroker(broker);
queue.setDestination(destination);
queue.setKeyExtractor(KEY_MAPPER);
- if( usePartitionedQueue ) {
+ if (usePartitionedQueue) {
queue.setPartitionMapper(PARTITION_MAPPER);
}
return queue;