You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/08 11:22:23 UTC

svn commit: r515999 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: thread/DedicatedTaskRunner.java thread/PooledTaskRunner.java thread/TaskRunner.java transport/vm/VMTransport.java

Author: rajdavies
Date: Thu Mar  8 02:22:22 2007
New Revision: 515999

URL: http://svn.apache.org/viewvc?view=rev&rev=515999
Log:
use async dispatch for vm:// transport by default

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java Thu Mar  8 02:22:22 2007
@@ -58,9 +58,10 @@
 
     /**
      * shut down the task
+     * @param timeout 
      * @throws InterruptedException 
      */
-    public void shutdown() throws InterruptedException{
+    public void shutdown(long timeout) throws InterruptedException{
         synchronized(mutex){
             shutdown=true;
             pending=true;
@@ -68,10 +69,18 @@
 
             // Wait till the thread stops.
             if(!threadTerminated){
-                mutex.wait();
+                mutex.wait(timeout);
             }
         }
-    }        
+    }      
+    
+    /**
+     * shut down the task
+     * @throws InterruptedException 
+     */
+    public void shutdown() throws InterruptedException{
+        shutdown(0);
+    }
     
     private void runTask() {
         

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java Thu Mar  8 02:22:22 2007
@@ -77,7 +77,7 @@
      * shut down the task
      * @throws InterruptedException 
      */
-    public void shutdown() throws InterruptedException{
+    public void shutdown(long timeout) throws InterruptedException{
         synchronized(runable){
             shutdown=true;
             //the check on the thread is done
@@ -85,13 +85,17 @@
             //shutDown() being called, which would wait forever
             //waiting for iterating to finish
             if(runningThread!=Thread.currentThread()){
-                while(iterating==true){
-                    runable.wait();
+                if(iterating==true){
+                    runable.wait(timeout);
                 }
             }
         }
     }        
     
+    
+    public void shutdown() throws InterruptedException {
+        shutdown(0);
+    }
     private void runTask() {
         
         synchronized (runable) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java Thu Mar  8 02:22:22 2007
@@ -25,4 +25,5 @@
 public interface TaskRunner {
     public abstract void wakeup() throws InterruptedException;
     public abstract void shutdown() throws InterruptedException;
+    public abstract void shutdown(long timeout) throws InterruptedException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Thu Mar  8 02:22:22 2007
@@ -21,6 +21,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.thread.Task;
@@ -50,26 +51,31 @@
     protected boolean disposed;
     protected boolean marshal;
     protected boolean network;
-    protected boolean async=false;
-    protected boolean started=false;
+    protected boolean async=true;
+    protected AtomicBoolean started=new AtomicBoolean();
     protected int asyncQueueDepth=2000;
     protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
     protected LinkedBlockingQueue messageQueue=null;
     protected final URI location;
     protected final long id;
     private TaskRunner taskRunner;
+    private final Object mutex=new Object();
 
     public VMTransport(URI location){
         this.location=location;
         this.id=nextId.getAndIncrement();
     }
 
-    synchronized public VMTransport getPeer(){
-        return peer;
+    public VMTransport getPeer(){
+        synchronized(mutex){
+            return peer;
+        }
     }
 
-    synchronized public void setPeer(VMTransport peer){
-        this.peer=peer;
+    public void setPeer(VMTransport peer){
+        synchronized(mutex){
+            this.peer=peer;
+        }
     }
 
     public void oneway(Object command) throws IOException{
@@ -99,10 +105,12 @@
         }
     }
 
-    protected synchronized void asyncOneWay(Object command) throws IOException{
+    protected void asyncOneWay(Object command) throws IOException{
         try{
-            if(messageQueue==null){
-                messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+            synchronized(mutex){
+                if(messageQueue==null){
+                    messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+                }
             }
             messageQueue.put(command);
             wakeup();
@@ -124,40 +132,46 @@
         throw new AssertionError("Unsupported Method");
     }
 
-    public synchronized TransportListener getTransportListener(){
-        return transportListener;
+    public TransportListener getTransportListener(){
+        synchronized(mutex){
+            return transportListener;
+        }
     }
 
-    synchronized public void setTransportListener(TransportListener commandListener){
-        this.transportListener=commandListener;
+    public void setTransportListener(TransportListener commandListener){
+        synchronized(mutex){
+            this.transportListener=commandListener;
+        }
         wakeup();
         peer.wakeup();
     }
 
-    public synchronized void start() throws Exception{
-        started=true;
-        if(transportListener==null)
-            throw new IOException("TransportListener not set.");
-        if(!async){
-            for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
-                Command command=(Command)iter.next();
-                transportListener.onCommand(command);
-                iter.remove();
+    public void start() throws Exception{
+        if(started.compareAndSet(false,true)){
+            if(transportListener==null)
+                throw new IOException("TransportListener not set.");
+            if(!async){
+                for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+                    Command command=(Command)iter.next();
+                    transportListener.onCommand(command);
+                    iter.remove();
+                }
+            }else{
+                peer.wakeup();
+                wakeup();
             }
-        }else{
-            peer.wakeup();
-            wakeup();
         }
     }
 
-    public synchronized void stop() throws Exception{
-        started=false;
-        if(!disposed){
-            disposed=true;
-        }
-        if(taskRunner!=null){
-            taskRunner.shutdown();
-            taskRunner=null;
+    public void stop() throws Exception{
+        if(started.compareAndSet(true,false)){
+            if(!disposed){
+                disposed=true;
+            }
+            if(taskRunner!=null){
+                taskRunner.shutdown(1000);
+                taskRunner=null;
+            }
         }
     }
 
@@ -201,16 +215,16 @@
     public boolean iterate(){
         final TransportListener tl=peer.transportListener;
         Command command=null;
-        // if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
-        synchronized(this){
-            if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){
+        synchronized(mutex){
+            if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){
                 command=(Command)messageQueue.poll();
-                if (command != null) {
-                    tl.onCommand(command);
-                }
             }
         }
-        return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
+        if(tl!=null&&command!=null){
+            tl.onCommand(command);
+        }
+        boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
+        return result;
     }
 
     /**
@@ -241,10 +255,12 @@
         this.asyncQueueDepth=asyncQueueDepth;
     }
 
-    protected synchronized void wakeup(){
+    protected void wakeup(){
         if(async){
-            if(taskRunner==null){
-                taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+            synchronized(mutex){
+                if(taskRunner==null){
+                    taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+                }
             }
             try{
                 taskRunner.wakeup();