You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/27 20:53:09 UTC
svn commit: r748660 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/
test/java/org/apache/activemq/flow/
Author: chirino
Date: Fri Feb 27 19:53:09 2009
New Revision: 748660
URL: http://svn.apache.org/viewvc?rev=748660&view=rev
Log:
Applying colins's https://issues.apache.org/activemq/browse/AMQ-2141 patch
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Fri Feb 27 19:53:09 2009
@@ -77,9 +77,10 @@
/**
* This must be called to release any resource the dispatcher is holding
- * on behalf of this context.
+ * on behalf of this context. Once called this {@link DispatchContext} should
+ * no longer be used.
*/
- public void close();
+ public void close(boolean sync);
}
public class RunnableAdapter implements Dispatchable, Runnable {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri Feb 27 19:53:09 2009
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.dispatch;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
@@ -35,12 +38,13 @@
protected boolean running = false;
private boolean threaded = false;
protected final int MAX_USER_PRIORITY;
+ protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
// Set if this dispatcher is part of a dispatch pool:
protected final PooledDispatcher<D> pooledDispatcher;
// The local dispatch queue:
- private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+ protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
// Dispatch queue for requests from other threads:
private final LinkedNodeList<ForeignEvent>[] foreignQueue;
@@ -157,16 +161,35 @@
*
* @see org.apache.activemq.dispatch.IDispatcher#shutdown()
*/
- public synchronized void shutdown() throws InterruptedException {
- if (thread != null) {
- dispatch(new RunnableAdapter() {
- public void run() {
- running = false;
- }
- }, MAX_USER_PRIORITY + 1);
+ public void shutdown() throws InterruptedException {
+ Thread joinThread = null;
+ synchronized (this) {
+ if (thread != null) {
+ dispatch(new RunnableAdapter() {
+ public void run() {
+ running = false;
+ }
+ }, MAX_USER_PRIORITY + 1);
+ joinThread = thread;
+ thread = null;
+ }
+ }
+ if (joinThread != null) {
// thread.interrupt();
- thread.join();
- thread = null;
+ joinThread.join();
+ }
+ }
+
+ protected void cleanup() {
+ ArrayList<PriorityDispatchContext> toClose = null;
+ synchronized (this) {
+ running = false;
+ toClose = new ArrayList<PriorityDispatchContext>(contexts.size());
+ toClose.addAll(contexts);
+ }
+
+ for (PriorityDispatchContext context : toClose) {
+ context.close(false);
}
}
@@ -235,6 +258,7 @@
thrown.printStackTrace();
} finally {
pooledDispatcher.onDispatcherStopped((D) this);
+ cleanup();
}
}
@@ -261,7 +285,7 @@
foreignPermits.release();
}
- protected final void onForeignUdate(PriorityDispatchContext context) {
+ protected final void onForeignUpdate(PriorityDispatchContext context) {
synchronized (foreignQueue) {
ForeignEvent fe = context.updateEvent[foreignToggle];
@@ -283,14 +307,31 @@
if (context.updateEvent[1].isLinked()) {
context.updateEvent[1].unlink();
}
- if (context.isLinked()) {
- context.unlink();
- return true;
- }
}
+
+ if (context.isLinked()) {
+ context.unlink();
+ return true;
+ }
+
+ synchronized (this) {
+ contexts.remove(context);
+ }
+
return false;
}
+ protected final boolean takeOwnership(PriorityDispatchContext context) {
+ synchronized (this) {
+ if (running) {
+ contexts.add(context);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
/*
* (non-Javadoc)
*
@@ -377,11 +418,13 @@
private int priority;
private boolean dispatchRequested = false;
private boolean closed = false;
+ final CountDownLatch closeLatch = new CountDownLatch(1);
protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
this.dispatchable = dispatchable;
this.name = name;
this.currentOwner = (D) PriorityDispatcher.this;
+ this.currentOwner.contexts.add(this);
if (persistent) {
this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
} else {
@@ -428,15 +471,14 @@
updateDispatcher = newDispatcher;
if (DEBUG)
System.out.println(getName() + " updating to " + updateDispatcher);
+
+ currentOwner.onForeignUpdate(this);
}
- currentOwner.onForeignUdate(this);
+
}
public void requestDispatch() {
- if (closed) {
- throw new RejectedExecutionException();
- }
D callingDispatcher = getCurrentDispatcher();
if (tracker != null)
tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
@@ -449,6 +491,12 @@
// delegate to the dispatcher.
if (currentOwner == callingDispatcher) {
+ if (!currentOwner.running) {
+ // TODO In the event that the current dispatcher
+ // failed due to a runtime exception, we could
+ // try to switch to a new dispatcher.
+ throw new RejectedExecutionException();
+ }
if (!isLinked()) {
currentOwner.priorityQueue.add(this, listPrio);
}
@@ -456,12 +504,16 @@
}
dispatchRequested = true;
+ currentOwner.onForeignUpdate(this);
}
- // FIXME Thread safety!
- currentOwner.onForeignUdate(this);
}
public void updatePriority(int priority) {
+
+ if (closed) {
+ return;
+ }
+
if (this.priority == priority) {
return;
}
@@ -470,6 +522,9 @@
// Otherwise this is coming off another thread, so we need to
// synchronize to protect against ownership changes:
synchronized (this) {
+ if (closed) {
+ return;
+ }
this.priority = priority;
// If this is called by the owning dispatcher, then we go ahead
@@ -488,30 +543,34 @@
}
return;
}
+
+ currentOwner.onForeignUpdate(this);
}
- // FIXME Thread safety!
- currentOwner.onForeignUdate(this);
+
}
public void processForeignUpdates() {
- boolean ownerChange = false;
synchronized (this) {
if (closed) {
- close();
+ close(false);
return;
}
- if (updateDispatcher != null) {
+ if (updateDispatcher != null && updateDispatcher.takeOwnership(this)) {
if (DEBUG) {
System.out.println("Assigning " + getName() + " to " + updateDispatcher);
}
+
if (currentOwner.removeDispatchContext(this)) {
dispatchRequested = true;
}
+
+ updateDispatcher.onForeignUpdate(this);
+ switchedDispatcher(currentOwner, updateDispatcher);
currentOwner = updateDispatcher;
updateDispatcher = null;
- ownerChange = true;
+
} else {
updatePriority(priority);
@@ -521,10 +580,6 @@
}
}
}
-
- if (ownerChange) {
- currentOwner.onForeignUdate(this);
- }
}
/**
@@ -539,27 +594,40 @@
}
- public void close() {
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void close(boolean sync) {
D callingDispatcher = getCurrentDispatcher();
+ // System.out.println(this + "Closing");
synchronized (this) {
closed = true;
// If the owner of this context is the calling thread, then
// delegate to the dispatcher.
if (currentOwner == callingDispatcher) {
- if (isLinked()) {
- unlink();
- }
- tracker.close();
+ removeDispatchContext(this);
+ closeLatch.countDown();
+ return;
+ }
+ }
- // FIXME Deadlock potential!
- synchronized (foreignQueue) {
- if (updateEvent[foreignToggle].isLinked()) {
- updateEvent[foreignToggle].unlink();
- }
+ currentOwner.onForeignUpdate(this);
+ if (sync) {
+ boolean interrupted = false;
+ while (true) {
+ try {
+ closeLatch.await();
+ break;
+ } catch (InterruptedException e) {
+ interrupted = true;
}
}
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
- currentOwner.onForeignUdate(this);
}
public final String toString() {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Fri Feb 27 19:53:09 2009
@@ -399,7 +399,6 @@
}
String was = Thread.currentThread().getName();
try {
- Thread.currentThread().setName(name);
for (ISourceController<E> source : blockedSources) {
// System.out.println("UNBLOCKING: SINK[" +
// FlowController.this + "], SOURCE[" + source +
@@ -420,7 +419,6 @@
resumeScheduled = false;
mutex.notifyAll();
}
- Thread.currentThread().setName(was);
}
}
};
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 27 19:53:09 2009
@@ -32,7 +32,6 @@
import org.apache.activemq.metric.Period;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.queue.Mapper;
-import org.apache.activemq.transport.nio.SelectorManager;
public class MockBrokerTest extends TestCase {
@@ -110,7 +109,7 @@
protected IDispatcher createDispatcher() {
return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
}
-
+
public void test_1_1_0() throws Exception {
producerCount = 1;
destCount = 1;
@@ -235,7 +234,7 @@
consumerCount = 2;
createConnections();
- rcvBroker.consumers.get(0).setThinkTime(5);
+ rcvBroker.consumers.get(0).setThinkTime(50);
// Start 'em up.
startServices();
@@ -451,19 +450,19 @@
}
private void stopServices() throws Exception {
- if (dispatcher != null) {
- dispatcher.shutdown();
- }
for (MockBroker broker : brokers) {
broker.stopServices();
}
+ if (dispatcher != null) {
+ dispatcher.shutdown();
+ }
}
private void startServices() throws Exception {
for (MockBroker broker : brokers) {
broker.startServices();
}
- SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS));
+ //SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS));
}
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Fri Feb 27 19:53:09 2009
@@ -63,7 +63,7 @@
public void stop() throws Exception {
if (readContext != null) {
- readContext.close();
+ readContext.close(true);
} else {
stopping.set(true);
thread.join();
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Fri Feb 27 19:53:09 2009
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.IDispatcher;
@@ -39,7 +40,7 @@
private final int outputResumeThreshold = 900;
private final int inputWindowSize = 1000;
- private final int inputResumeThreshold = 900;
+ private final int inputResumeThreshold = 500;
private IDispatcher dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
@@ -181,17 +182,21 @@
onException(e);
}
} else {
- blockingWriter.execute(new Runnable() {
- public void run() {
- if (!stopping.get()) {
- try {
- transport.oneway(o);
- } catch (IOException e) {
- onException(e);
+ try {
+ blockingWriter.execute(new Runnable() {
+ public void run() {
+ if (!stopping.get()) {
+ try {
+ transport.oneway(o);
+ } catch (IOException e) {
+ onException(e);
+ }
}
}
- }
- });
+ });
+ } catch (RejectedExecutionException re) {
+ //Must be shutting down.
+ }
}
}
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Fri Feb 27 19:53:09 2009
@@ -65,7 +65,7 @@
public void stop() throws Exception
{
- dispatchContext.close();
+ dispatchContext.close(false);
super.stop();
}