You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/22 21:24:47 UTC
svn commit: r777666 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Author: chirino
Date: Fri May 22 19:24:46 2009
New Revision: 777666
URL: http://svn.apache.org/viewvc?rev=777666&view=rev
Log:
rolling back change committed in rev 777209
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=777666&r1=777665&r2=777666&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri May 22 19:24:46 2009
@@ -29,9 +29,7 @@
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -121,9 +119,6 @@
private Boolean tcpNoDelay;
private Thread runnerThread;
- private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
- private Thread onewayThread;
-
/**
* Connect to a remote Node - e.g. a Broker
*
@@ -162,37 +157,14 @@
this.localLocation = null;
setDaemon(true);
}
-
+
/**
* A one way asynchronous send
*/
public void oneway(Object command) throws IOException {
checkStarted();
- try {
- outbound.put(command);
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- }
-
- protected void sendOneways() {
- try {
- while(!isStopped()) {
- Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
- if( command!=null ) {
- try {
- while( command!=null ) {
- wireFormat.marshal(command, dataOut);
- command = outbound.poll();
- }
- dataOut.flush();
- } catch (IOException e) {
- getTransportListener().onException(e);
- }
- }
- }
- } catch (InterruptedException e) {
- }
+ wireFormat.marshal(command, dataOut);
+ dataOut.flush();
}
/**
@@ -427,11 +399,6 @@
protected void doStart() throws Exception {
connect();
- onewayThread = new Thread(null, new Runnable(){
- public void run() {
- sendOneways();
- }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
- onewayThread.start();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}
@@ -520,11 +487,7 @@
LOG.debug("Caught exception closing socket",e);
}
}
- }
- if( onewayThread!=null ) {
- onewayThread.join();
- onewayThread = null;
- outbound.clear();
+
}
}