You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/01/10 20:40:33 UTC

svn commit: r494950 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Author: rajdavies
Date: Wed Jan 10 11:40:33 2007
New Revision: 494950

URL: http://svn.apache.org/viewvc?view=rev&rev=494950
Log:
Tidied up the async dispatch option

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=494950&r1=494949&r2=494950
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Jan 10 11:40:33 2007
@@ -11,6 +11,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.transport.vm;
 
 import java.io.IOException;
@@ -19,6 +20,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import org.apache.activemq.broker.BrokerStoppedException;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -32,16 +34,18 @@
 import org.apache.commons.logging.LogFactory;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * A Transport implementation that uses direct method invocations.
  * 
  * @version $Revision$
  */
 public class VMTransport implements Transport,Task{
+
     private static final Log log=LogFactory.getLog(VMTransport.class);
     private static final AtomicLong nextId=new AtomicLong(0);
     private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
-                    true,1000);
+            true,1000);
     protected VMTransport peer;
     protected TransportListener transportListener;
     protected boolean disposed;
@@ -51,7 +55,7 @@
     protected boolean started=false;
     protected int asyncQueueDepth=2000;
     protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
-    protected LinkedBlockingQueue messageQueue;
+    protected LinkedBlockingQueue messageQueue=null;
     protected final URI location;
     protected final long id;
     private TaskRunner taskRunner;
@@ -76,9 +80,8 @@
         if(peer==null)
             throw new IOException("Peer not connected.");
         if(!peer.disposed){
-           
             if(async){
-               asyncOneWay(command); 
+                asyncOneWay(command);
             }else{
                 syncOneWay(command);
             }
@@ -86,7 +89,7 @@
             throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
         }
     }
-    
+
     protected void syncOneWay(Object command){
         final TransportListener tl=peer.transportListener;
         prePeerSetQueue=peer.prePeerSetQueue;
@@ -96,10 +99,12 @@
             tl.onCommand(command);
         }
     }
-    
-    protected void asyncOneWay(Object command) throws IOException{
-        messageQueue=getMessageQueue();
+
+    protected synchronized void asyncOneWay(Object command) throws IOException{
         try{
+            if(messageQueue==null){
+                messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+            }
             messageQueue.put(command);
             wakeup();
         }catch(final InterruptedException e){
@@ -136,17 +141,17 @@
             throw new IOException("TransportListener not set.");
         if(!async){
             for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
-                Command command=(Command) iter.next();
+                Command command=(Command)iter.next();
                 transportListener.onCommand(command);
                 iter.remove();
             }
         }else{
-            wakeup();
             peer.wakeup();
+            wakeup();
         }
     }
 
-    public void stop() throws Exception{
+    public synchronized void stop() throws Exception{
         started=false;
         if(!disposed){
             disposed=true;
@@ -196,11 +201,17 @@
      */
     public boolean iterate(){
         final TransportListener tl=peer.transportListener;
-        if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
-            final Command command=(Command) messageQueue.poll();
-            tl.onCommand(command);
+        Command command=null;
+        // if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
+        synchronized(this){
+            if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){
+                command=(Command)messageQueue.poll();
+                if (command != null) {
+                    tl.onCommand(command);
+                }
+            }
         }
-        return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
+        return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
     }
 
     /**
@@ -231,8 +242,8 @@
         this.asyncQueueDepth=asyncQueueDepth;
     }
 
-    protected void wakeup(){
-        if(async&&messageQueue!=null&&!messageQueue.isEmpty()){
+    protected synchronized void wakeup(){
+        if(async){
             if(taskRunner==null){
                 taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
             }
@@ -242,12 +253,5 @@
                 Thread.currentThread().interrupt();
             }
         }
-    }
-
-    protected synchronized LinkedBlockingQueue getMessageQueue(){
-        if(messageQueue==null){
-            messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
-        }
-        return messageQueue;
     }
 }