You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/02/10 21:19:44 UTC

svn commit: r1242912 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm: VMTransport.java VMTransportServer.java

Author: gtully
Date: Fri Feb 10 20:19:44 2012
New Revision: 1242912

URL: http://svn.apache.org/viewvc?rev=1242912&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3684 - resolve deadlock on blocked oneway, revise sync and lazy init, remove use of valve

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1242912&r1=1242911&r2=1242912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Feb 10 20:19:44 2012
@@ -26,20 +26,14 @@ import org.apache.activemq.command.Shutd
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.IOExceptionSupport;
-
 
 /**
  * A Transport implementation that uses direct method invocations.
- * 
- * 
  */
 public class VMTransport implements Transport, Task {
 
@@ -47,21 +41,23 @@ public class VMTransport implements Tran
     private static final AtomicLong NEXT_ID = new AtomicLong(0);
     protected VMTransport peer;
     protected TransportListener transportListener;
-    protected boolean disposed;
     protected boolean marshal;
     protected boolean network;
     protected boolean async = true;
     protected int asyncQueueDepth = 2000;
-    protected LinkedBlockingQueue<Object> messageQueue;
-    protected boolean started;
     protected final URI location;
     protected final long id;
-    private TaskRunner taskRunner;
-    private final Object lazyInitMutext = new Object();
-    private final Valve enqueueValve = new Valve(true);
-    protected final AtomicBoolean stopping = new AtomicBoolean();
+    protected LinkedBlockingQueue<Object> messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
+    private TaskRunner taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
+
     private volatile int receiveCounter;
-    
+
+    // Managed Sate access protected by locks.
+    protected final AtomicBoolean stopping = new AtomicBoolean();
+    protected final AtomicBoolean started = new AtomicBoolean();
+    protected final AtomicBoolean starting = new AtomicBoolean();
+    protected final AtomicBoolean disposed = new AtomicBoolean();
+
     public VMTransport(URI location) {
         this.location = location;
         this.id = NEXT_ID.getAndIncrement();
@@ -72,50 +68,52 @@ public class VMTransport implements Tran
     }
 
     public void oneway(Object command) throws IOException {
-        if (disposed) {
+        if (disposed.get()) {
             throw new TransportDisposedIOException("Transport disposed.");
         }
         if (peer == null) {
             throw new IOException("Peer not connected.");
         }
 
-        
-        TransportListener transportListener=null;
+        TransportListener transportListener = null;
         try {
-            // Disable the peer from changing his state while we try to enqueue onto him.
-            peer.enqueueValve.increment();
-        
-            if (peer.disposed || peer.stopping.get()) {
+            if (peer.disposed.get() || peer.stopping.get()) {
                 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
             }
-            
-            if (peer.started) {
+
+            if (peer.started.get()) {
                 if (peer.async) {
-                    peer.getMessageQueue().put(command);
+                    peer.messageQueue.put(command);
                     peer.wakeup();
                 } else {
                     transportListener = peer.transportListener;
                 }
             } else {
-                peer.getMessageQueue().put(command);
+                peer.messageQueue.put(command);
+                synchronized (peer.starting) {
+                    if (peer.started.get() && !peer.messageQueue.isEmpty()) {
+                        // we missed the pending dispatch during start
+                        if (peer.async) {
+                            peer.wakeup();
+                        } else {
+                            transportListener = peer.transportListener;
+                        }
+                    }
+                }
             }
-            
         } catch (InterruptedException e) {
             InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
             iioe.initCause(e);
             throw iioe;
-        } finally {
-            // Allow the peer to change state again...
-            peer.enqueueValve.decrement();
         }
-
         dispatch(peer, transportListener, command);
     }
-    
+
     public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
-        if( transportListener!=null ) {
-            if( command == DISCONNECT ) {
-                transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
+        if (transportListener != null) {
+            if (command == DISCONNECT) {
+                transportListener.onException(
+                        new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
             } else {
                 transport.receiveCounter++;
                 transportListener.onCommand(command);
@@ -124,135 +122,81 @@ public class VMTransport implements Tran
     }
 
     public void start() throws Exception {
-        if (transportListener == null) {
-            throw new IOException("TransportListener not set.");
-        }
-        try {
-            enqueueValve.turnOff();
-            if (messageQueue != null && !async) {
+
+        if (starting.compareAndSet(false, true)) {
+
+            if (transportListener == null) {
+                throw new IOException("TransportListener not set.");
+            }
+
+            // ensure there is no missed dispatch during start, sync with oneway
+            synchronized (peer.starting) {
                 Object command;
-                while ((command = messageQueue.poll()) != null && !stopping.get() ) {
-                    receiveCounter++;
+                while ((command = messageQueue.poll()) != null && !stopping.get()) {
                     dispatch(this, transportListener, command);
                 }
+
+                if (!disposed.get()) {
+
+                    started.set(true);
+
+                    if (async) {
+                        taskRunner.wakeup();
+                    } else {
+                        messageQueue.clear();
+                        messageQueue = null;
+                        taskRunner.shutdown();
+                        taskRunner = null;
+                    }
+                }
             }
-            started = true;
-            wakeup();
-        } finally {
-            enqueueValve.turnOn();
-        }
-        // If we get stopped while starting up, then do the actual stop now 
-        // that the enqueueValve is back on.
-        if( stopping.get() ) {
-            stop();
         }
     }
 
     public void stop() throws Exception {
-        stopping.set(true);
-        
-        // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
-        if( enqueueValve.isOn() ) {
-        	
+        if (disposed.compareAndSet(false, true)) {
+            stopping.set(true);
             // let the peer know that we are disconnecting..
             try {
                 peer.transportListener.onCommand(new ShutdownInfo());
             } catch (Exception ignore) {
             }
-        	
-        	
-            TaskRunner tr = null;
-            try {
-                enqueueValve.turnOff();
-                if (!disposed) {
-                    started = false;
-                    disposed = true;
-                    if (taskRunner != null) {
-                        tr = taskRunner;
-                        taskRunner = null;
-                    }
-                }
-            } finally {
-                stopping.set(false);
-                enqueueValve.turnOn();
+
+            if (messageQueue != null) {
+                messageQueue.clear();
             }
-            if (tr != null) {
-                tr.shutdown(1000);
+            if (taskRunner != null) {
+                taskRunner.shutdown(1000);
+                taskRunner = null;
             }
-            
-
         }
-        
     }
-    
+
     /**
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        
-        final TransportListener tl;
-        try {
-            // Disable changing the state variables while we are running... 
-            enqueueValve.increment();
-            tl = transportListener;
-            if (!started || disposed || tl == null || stopping.get()) {
-                if( stopping.get() ) {
-                    // drain the queue it since folks could be blocked putting on to
-                    // it and that would not allow the stop() method for finishing up.
-                    getMessageQueue().clear();  
-                }
-                return false;
-            }
-        } catch (InterruptedException e) {
+
+        if (disposed.get() || stopping.get()) {
             return false;
-        } finally {
-            enqueueValve.decrement();
         }
 
-        LinkedBlockingQueue<Object> mq = getMessageQueue();
+        LinkedBlockingQueue<Object> mq = messageQueue;
         Object command = mq.poll();
         if (command != null) {
-            if( command == DISCONNECT ) {
-                tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
+            if (command == DISCONNECT) {
+                transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
             } else {
-                tl.onCommand(command);
+                transportListener.onCommand(command);
             }
             return !mq.isEmpty();
         } else {
             return false;
         }
-        
     }
 
     public void setTransportListener(TransportListener commandListener) {
-        try {
-            // enqueue can block on blocking queue, preventing turnOff
-            // so avoid in that case: https://issues.apache.org/jira/browse/AMQ-3684
-            if (async && getMessageQueue().remainingCapacity() == 0) {
-                // enqueue blocked or will be
-                this.transportListener = commandListener;
-                wakeup();
-            } else {
-                try {
-                    enqueueValve.turnOff();
-                    this.transportListener = commandListener;
-                    wakeup();
-                } finally {
-                    enqueueValve.turnOn();
-                }
-            }
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private LinkedBlockingQueue<Object> getMessageQueue() {
-        synchronized (lazyInitMutext) {
-            if (messageQueue == null) {
-                messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
-            }
-            return messageQueue;
-        }
+        this.transportListener = commandListener;
     }
 
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
@@ -336,11 +280,6 @@ public class VMTransport implements Tran
 
     protected void wakeup() {
         if (async) {
-            synchronized (lazyInitMutext) {
-                if (taskRunner == null) {
-                    taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
-                }
-            }
             try {
                 taskRunner.wakeup();
             } catch (InterruptedException e) {
@@ -353,16 +292,16 @@ public class VMTransport implements Tran
         return false;
     }
 
-	public boolean isDisposed() {
-		return disposed;
-	}
-	
-	public boolean isConnected() {
-	    return started;
-	}
+    public boolean isDisposed() {
+        return disposed.get();
+    }
 
-	public void reconnect(URI uri) throws IOException {
-        throw new IOException("Not supported");
+    public boolean isConnected() {
+        return started.get();
+    }
+
+    public void reconnect(URI uri) throws IOException {
+        throw new IOException("reconnection Not supported by this transport.");
     }
 
     public boolean isReconnectSupported() {
@@ -372,7 +311,8 @@ public class VMTransport implements Tran
     public boolean isUpdateURIsSupported() {
         return false;
     }
-    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+
+    public void updateURIs(boolean reblance, URI[] uris) throws IOException {
         throw new IOException("Not supported");
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java?rev=1242912&r1=1242911&r2=1242912&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java Fri Feb 10 20:19:44 2012
@@ -75,7 +75,7 @@ public class VMTransportServer implement
         connectionCount.incrementAndGet();
         VMTransport client = new VMTransport(location) {
             public void stop() throws Exception {
-            	if (stopping.compareAndSet(false, true) && !disposed) {
+            	if (stopping.compareAndSet(false, true) && !disposed.get()) {
 					super.stop();
 					if (connectionCount.decrementAndGet() == 0
 							&& disposeOnDisconnect) {