You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/01/10 20:40:33 UTC
svn commit: r494950 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Author: rajdavies
Date: Wed Jan 10 11:40:33 2007
New Revision: 494950
URL: http://svn.apache.org/viewvc?view=rev&rev=494950
Log:
Tidied up the async dispatch option
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=494950&r1=494949&r2=494950
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Jan 10 11:40:33 2007
@@ -11,6 +11,7 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.transport.vm;
import java.io.IOException;
@@ -19,6 +20,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import org.apache.activemq.broker.BrokerStoppedException;
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
@@ -32,16 +34,18 @@
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
+
/**
* A Transport implementation that uses direct method invocations.
*
* @version $Revision$
*/
public class VMTransport implements Transport,Task{
+
private static final Log log=LogFactory.getLog(VMTransport.class);
private static final AtomicLong nextId=new AtomicLong(0);
private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
- true,1000);
+ true,1000);
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
@@ -51,7 +55,7 @@
protected boolean started=false;
protected int asyncQueueDepth=2000;
protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
- protected LinkedBlockingQueue messageQueue;
+ protected LinkedBlockingQueue messageQueue=null;
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
@@ -76,9 +80,8 @@
if(peer==null)
throw new IOException("Peer not connected.");
if(!peer.disposed){
-
if(async){
- asyncOneWay(command);
+ asyncOneWay(command);
}else{
syncOneWay(command);
}
@@ -86,7 +89,7 @@
throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
}
}
-
+
protected void syncOneWay(Object command){
final TransportListener tl=peer.transportListener;
prePeerSetQueue=peer.prePeerSetQueue;
@@ -96,10 +99,12 @@
tl.onCommand(command);
}
}
-
- protected void asyncOneWay(Object command) throws IOException{
- messageQueue=getMessageQueue();
+
+ protected synchronized void asyncOneWay(Object command) throws IOException{
try{
+ if(messageQueue==null){
+ messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+ }
messageQueue.put(command);
wakeup();
}catch(final InterruptedException e){
@@ -136,17 +141,17 @@
throw new IOException("TransportListener not set.");
if(!async){
for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
- Command command=(Command) iter.next();
+ Command command=(Command)iter.next();
transportListener.onCommand(command);
iter.remove();
}
}else{
- wakeup();
peer.wakeup();
+ wakeup();
}
}
- public void stop() throws Exception{
+ public synchronized void stop() throws Exception{
started=false;
if(!disposed){
disposed=true;
@@ -196,11 +201,17 @@
*/
public boolean iterate(){
final TransportListener tl=peer.transportListener;
- if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
- final Command command=(Command) messageQueue.poll();
- tl.onCommand(command);
+ Command command=null;
+ // if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
+ synchronized(this){
+ if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){
+ command=(Command)messageQueue.poll();
+ if (command != null) {
+ tl.onCommand(command);
+ }
+ }
}
- return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
+ return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
}
/**
@@ -231,8 +242,8 @@
this.asyncQueueDepth=asyncQueueDepth;
}
- protected void wakeup(){
- if(async&&messageQueue!=null&&!messageQueue.isEmpty()){
+ protected synchronized void wakeup(){
+ if(async){
if(taskRunner==null){
taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
}
@@ -242,12 +253,5 @@
Thread.currentThread().interrupt();
}
}
- }
-
- protected synchronized LinkedBlockingQueue getMessageQueue(){
- if(messageQueue==null){
- messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
- }
- return messageQueue;
}
}