You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/03/08 11:22:23 UTC
svn commit: r515999 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
thread/DedicatedTaskRunner.java thread/PooledTaskRunner.java
thread/TaskRunner.java transport/vm/VMTransport.java
Author: rajdavies
Date: Thu Mar 8 02:22:22 2007
New Revision: 515999
URL: http://svn.apache.org/viewvc?view=rev&rev=515999
Log:
use async dispatch for vm:// transport by default
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java Thu Mar 8 02:22:22 2007
@@ -58,9 +58,10 @@
/**
* shut down the task
+ * @param timeout
* @throws InterruptedException
*/
- public void shutdown() throws InterruptedException{
+ public void shutdown(long timeout) throws InterruptedException{
synchronized(mutex){
shutdown=true;
pending=true;
@@ -68,10 +69,18 @@
// Wait till the thread stops.
if(!threadTerminated){
- mutex.wait();
+ mutex.wait(timeout);
}
}
- }
+ }
+
+ /**
+ * shut down the task
+ * @throws InterruptedException
+ */
+ public void shutdown() throws InterruptedException{
+ shutdown(0);
+ }
private void runTask() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java Thu Mar 8 02:22:22 2007
@@ -77,7 +77,7 @@
* shut down the task
* @throws InterruptedException
*/
- public void shutdown() throws InterruptedException{
+ public void shutdown(long timeout) throws InterruptedException{
synchronized(runable){
shutdown=true;
//the check on the thread is done
@@ -85,13 +85,17 @@
//shutDown() being called, which would wait forever
//waiting for iterating to finish
if(runningThread!=Thread.currentThread()){
- while(iterating==true){
- runable.wait();
+ if(iterating==true){
+ runable.wait(timeout);
}
}
}
}
+
+ public void shutdown() throws InterruptedException {
+ shutdown(0);
+ }
private void runTask() {
synchronized (runable) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java Thu Mar 8 02:22:22 2007
@@ -25,4 +25,5 @@
public interface TaskRunner {
public abstract void wakeup() throws InterruptedException;
public abstract void shutdown() throws InterruptedException;
+ public abstract void shutdown(long timeout) throws InterruptedException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=515999&r1=515998&r2=515999
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Thu Mar 8 02:22:22 2007
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.Task;
@@ -50,26 +51,31 @@
protected boolean disposed;
protected boolean marshal;
protected boolean network;
- protected boolean async=false;
- protected boolean started=false;
+ protected boolean async=true;
+ protected AtomicBoolean started=new AtomicBoolean();
protected int asyncQueueDepth=2000;
protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
protected LinkedBlockingQueue messageQueue=null;
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
+ private final Object mutex=new Object();
public VMTransport(URI location){
this.location=location;
this.id=nextId.getAndIncrement();
}
- synchronized public VMTransport getPeer(){
- return peer;
+ public VMTransport getPeer(){
+ synchronized(mutex){
+ return peer;
+ }
}
- synchronized public void setPeer(VMTransport peer){
- this.peer=peer;
+ public void setPeer(VMTransport peer){
+ synchronized(mutex){
+ this.peer=peer;
+ }
}
public void oneway(Object command) throws IOException{
@@ -99,10 +105,12 @@
}
}
- protected synchronized void asyncOneWay(Object command) throws IOException{
+ protected void asyncOneWay(Object command) throws IOException{
try{
- if(messageQueue==null){
- messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+ synchronized(mutex){
+ if(messageQueue==null){
+ messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+ }
}
messageQueue.put(command);
wakeup();
@@ -124,40 +132,46 @@
throw new AssertionError("Unsupported Method");
}
- public synchronized TransportListener getTransportListener(){
- return transportListener;
+ public TransportListener getTransportListener(){
+ synchronized(mutex){
+ return transportListener;
+ }
}
- synchronized public void setTransportListener(TransportListener commandListener){
- this.transportListener=commandListener;
+ public void setTransportListener(TransportListener commandListener){
+ synchronized(mutex){
+ this.transportListener=commandListener;
+ }
wakeup();
peer.wakeup();
}
- public synchronized void start() throws Exception{
- started=true;
- if(transportListener==null)
- throw new IOException("TransportListener not set.");
- if(!async){
- for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
- Command command=(Command)iter.next();
- transportListener.onCommand(command);
- iter.remove();
+ public void start() throws Exception{
+ if(started.compareAndSet(false,true)){
+ if(transportListener==null)
+ throw new IOException("TransportListener not set.");
+ if(!async){
+ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+ Command command=(Command)iter.next();
+ transportListener.onCommand(command);
+ iter.remove();
+ }
+ }else{
+ peer.wakeup();
+ wakeup();
}
- }else{
- peer.wakeup();
- wakeup();
}
}
- public synchronized void stop() throws Exception{
- started=false;
- if(!disposed){
- disposed=true;
- }
- if(taskRunner!=null){
- taskRunner.shutdown();
- taskRunner=null;
+ public void stop() throws Exception{
+ if(started.compareAndSet(true,false)){
+ if(!disposed){
+ disposed=true;
+ }
+ if(taskRunner!=null){
+ taskRunner.shutdown(1000);
+ taskRunner=null;
+ }
}
}
@@ -201,16 +215,16 @@
public boolean iterate(){
final TransportListener tl=peer.transportListener;
Command command=null;
- // if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
- synchronized(this){
- if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){
+ synchronized(mutex){
+ if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){
command=(Command)messageQueue.poll();
- if (command != null) {
- tl.onCommand(command);
- }
}
}
- return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
+ if(tl!=null&&command!=null){
+ tl.onCommand(command);
+ }
+ boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
+ return result;
}
/**
@@ -241,10 +255,12 @@
this.asyncQueueDepth=asyncQueueDepth;
}
- protected synchronized void wakeup(){
+ protected void wakeup(){
if(async){
- if(taskRunner==null){
- taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+ synchronized(mutex){
+ if(taskRunner==null){
+ taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+ }
}
try{
taskRunner.wakeup();