You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/02/10 21:19:44 UTC
svn commit: r1242912 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm:
VMTransport.java VMTransportServer.java
Author: gtully
Date: Fri Feb 10 20:19:44 2012
New Revision: 1242912
URL: http://svn.apache.org/viewvc?rev=1242912&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3684 - resolve deadlock on blocked oneway, revise sync and lazy init, remove use of valve
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1242912&r1=1242911&r2=1242912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Feb 10 20:19:44 2012
@@ -26,20 +26,14 @@ import org.apache.activemq.command.Shutd
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.thread.Valve;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.IOExceptionSupport;
-
/**
* A Transport implementation that uses direct method invocations.
- *
- *
*/
public class VMTransport implements Transport, Task {
@@ -47,21 +41,23 @@ public class VMTransport implements Tran
private static final AtomicLong NEXT_ID = new AtomicLong(0);
protected VMTransport peer;
protected TransportListener transportListener;
- protected boolean disposed;
protected boolean marshal;
protected boolean network;
protected boolean async = true;
protected int asyncQueueDepth = 2000;
- protected LinkedBlockingQueue<Object> messageQueue;
- protected boolean started;
protected final URI location;
protected final long id;
- private TaskRunner taskRunner;
- private final Object lazyInitMutext = new Object();
- private final Valve enqueueValve = new Valve(true);
- protected final AtomicBoolean stopping = new AtomicBoolean();
+ protected LinkedBlockingQueue<Object> messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
+ private TaskRunner taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
+
private volatile int receiveCounter;
-
+
+ // Managed Sate access protected by locks.
+ protected final AtomicBoolean stopping = new AtomicBoolean();
+ protected final AtomicBoolean started = new AtomicBoolean();
+ protected final AtomicBoolean starting = new AtomicBoolean();
+ protected final AtomicBoolean disposed = new AtomicBoolean();
+
public VMTransport(URI location) {
this.location = location;
this.id = NEXT_ID.getAndIncrement();
@@ -72,50 +68,52 @@ public class VMTransport implements Tran
}
public void oneway(Object command) throws IOException {
- if (disposed) {
+ if (disposed.get()) {
throw new TransportDisposedIOException("Transport disposed.");
}
if (peer == null) {
throw new IOException("Peer not connected.");
}
-
- TransportListener transportListener=null;
+ TransportListener transportListener = null;
try {
- // Disable the peer from changing his state while we try to enqueue onto him.
- peer.enqueueValve.increment();
-
- if (peer.disposed || peer.stopping.get()) {
+ if (peer.disposed.get() || peer.stopping.get()) {
throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
}
-
- if (peer.started) {
+
+ if (peer.started.get()) {
if (peer.async) {
- peer.getMessageQueue().put(command);
+ peer.messageQueue.put(command);
peer.wakeup();
} else {
transportListener = peer.transportListener;
}
} else {
- peer.getMessageQueue().put(command);
+ peer.messageQueue.put(command);
+ synchronized (peer.starting) {
+ if (peer.started.get() && !peer.messageQueue.isEmpty()) {
+ // we missed the pending dispatch during start
+ if (peer.async) {
+ peer.wakeup();
+ } else {
+ transportListener = peer.transportListener;
+ }
+ }
+ }
}
-
} catch (InterruptedException e) {
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
iioe.initCause(e);
throw iioe;
- } finally {
- // Allow the peer to change state again...
- peer.enqueueValve.decrement();
}
-
dispatch(peer, transportListener, command);
}
-
+
public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
- if( transportListener!=null ) {
- if( command == DISCONNECT ) {
- transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
+ if (transportListener != null) {
+ if (command == DISCONNECT) {
+ transportListener.onException(
+ new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else {
transport.receiveCounter++;
transportListener.onCommand(command);
@@ -124,135 +122,81 @@ public class VMTransport implements Tran
}
public void start() throws Exception {
- if (transportListener == null) {
- throw new IOException("TransportListener not set.");
- }
- try {
- enqueueValve.turnOff();
- if (messageQueue != null && !async) {
+
+ if (starting.compareAndSet(false, true)) {
+
+ if (transportListener == null) {
+ throw new IOException("TransportListener not set.");
+ }
+
+ // ensure there is no missed dispatch during start, sync with oneway
+ synchronized (peer.starting) {
Object command;
- while ((command = messageQueue.poll()) != null && !stopping.get() ) {
- receiveCounter++;
+ while ((command = messageQueue.poll()) != null && !stopping.get()) {
dispatch(this, transportListener, command);
}
+
+ if (!disposed.get()) {
+
+ started.set(true);
+
+ if (async) {
+ taskRunner.wakeup();
+ } else {
+ messageQueue.clear();
+ messageQueue = null;
+ taskRunner.shutdown();
+ taskRunner = null;
+ }
+ }
}
- started = true;
- wakeup();
- } finally {
- enqueueValve.turnOn();
- }
- // If we get stopped while starting up, then do the actual stop now
- // that the enqueueValve is back on.
- if( stopping.get() ) {
- stop();
}
}
public void stop() throws Exception {
- stopping.set(true);
-
- // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
- if( enqueueValve.isOn() ) {
-
+ if (disposed.compareAndSet(false, true)) {
+ stopping.set(true);
// let the peer know that we are disconnecting..
try {
peer.transportListener.onCommand(new ShutdownInfo());
} catch (Exception ignore) {
}
-
-
- TaskRunner tr = null;
- try {
- enqueueValve.turnOff();
- if (!disposed) {
- started = false;
- disposed = true;
- if (taskRunner != null) {
- tr = taskRunner;
- taskRunner = null;
- }
- }
- } finally {
- stopping.set(false);
- enqueueValve.turnOn();
+
+ if (messageQueue != null) {
+ messageQueue.clear();
}
- if (tr != null) {
- tr.shutdown(1000);
+ if (taskRunner != null) {
+ taskRunner.shutdown(1000);
+ taskRunner = null;
}
-
-
}
-
}
-
+
/**
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
-
- final TransportListener tl;
- try {
- // Disable changing the state variables while we are running...
- enqueueValve.increment();
- tl = transportListener;
- if (!started || disposed || tl == null || stopping.get()) {
- if( stopping.get() ) {
- // drain the queue it since folks could be blocked putting on to
- // it and that would not allow the stop() method for finishing up.
- getMessageQueue().clear();
- }
- return false;
- }
- } catch (InterruptedException e) {
+
+ if (disposed.get() || stopping.get()) {
return false;
- } finally {
- enqueueValve.decrement();
}
- LinkedBlockingQueue<Object> mq = getMessageQueue();
+ LinkedBlockingQueue<Object> mq = messageQueue;
Object command = mq.poll();
if (command != null) {
- if( command == DISCONNECT ) {
- tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
+ if (command == DISCONNECT) {
+ transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else {
- tl.onCommand(command);
+ transportListener.onCommand(command);
}
return !mq.isEmpty();
} else {
return false;
}
-
}
public void setTransportListener(TransportListener commandListener) {
- try {
- // enqueue can block on blocking queue, preventing turnOff
- // so avoid in that case: https://issues.apache.org/jira/browse/AMQ-3684
- if (async && getMessageQueue().remainingCapacity() == 0) {
- // enqueue blocked or will be
- this.transportListener = commandListener;
- wakeup();
- } else {
- try {
- enqueueValve.turnOff();
- this.transportListener = commandListener;
- wakeup();
- } finally {
- enqueueValve.turnOn();
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- private LinkedBlockingQueue<Object> getMessageQueue() {
- synchronized (lazyInitMutext) {
- if (messageQueue == null) {
- messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
- }
- return messageQueue;
- }
+ this.transportListener = commandListener;
}
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
@@ -336,11 +280,6 @@ public class VMTransport implements Tran
protected void wakeup() {
if (async) {
- synchronized (lazyInitMutext) {
- if (taskRunner == null) {
- taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
- }
- }
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
@@ -353,16 +292,16 @@ public class VMTransport implements Tran
return false;
}
- public boolean isDisposed() {
- return disposed;
- }
-
- public boolean isConnected() {
- return started;
- }
+ public boolean isDisposed() {
+ return disposed.get();
+ }
- public void reconnect(URI uri) throws IOException {
- throw new IOException("Not supported");
+ public boolean isConnected() {
+ return started.get();
+ }
+
+ public void reconnect(URI uri) throws IOException {
+ throw new IOException("reconnection Not supported by this transport.");
}
public boolean isReconnectSupported() {
@@ -372,7 +311,8 @@ public class VMTransport implements Tran
public boolean isUpdateURIsSupported() {
return false;
}
- public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+
+ public void updateURIs(boolean reblance, URI[] uris) throws IOException {
throw new IOException("Not supported");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java?rev=1242912&r1=1242911&r2=1242912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java Fri Feb 10 20:19:44 2012
@@ -75,7 +75,7 @@ public class VMTransportServer implement
connectionCount.incrementAndGet();
VMTransport client = new VMTransport(location) {
public void stop() throws Exception {
- if (stopping.compareAndSet(false, true) && !disposed) {
+ if (stopping.compareAndSet(false, true) && !disposed.get()) {
super.stop();
if (connectionCount.decrementAndGet() == 0
&& disposeOnDisconnect) {