You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/22 21:24:47 UTC

svn commit: r777666 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Author: chirino
Date: Fri May 22 19:24:46 2009
New Revision: 777666

URL: http://svn.apache.org/viewvc?rev=777666&view=rev
Log:
rolling back change committed in rev 777209

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=777666&r1=777665&r2=777666&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Fri May 22 19:24:46 2009
@@ -29,9 +29,7 @@
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -121,9 +119,6 @@
     private Boolean tcpNoDelay;
     private Thread runnerThread;
 
-    private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
-    private Thread onewayThread;
-
     /**
      * Connect to a remote Node - e.g. a Broker
      * 
@@ -162,37 +157,14 @@
         this.localLocation = null;
         setDaemon(true);
     }
-    
+
     /**
      * A one way asynchronous send
      */
     public void oneway(Object command) throws IOException {
         checkStarted();
-        try {
-            outbound.put(command);
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException();
-        }
-    }
-
-    protected void sendOneways() {
-        try {
-            while(!isStopped()) {
-                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
-                if( command!=null ) {
-                    try {
-                        while( command!=null ) {
-                            wireFormat.marshal(command, dataOut);
-                            command = outbound.poll();
-                        }
-                        dataOut.flush();
-                    } catch (IOException e) {
-                        getTransportListener().onException(e);
-                    }
-                }
-            }
-        } catch (InterruptedException e) {
-        }
+        wireFormat.marshal(command, dataOut);
+        dataOut.flush();
     }
 
     /**
@@ -427,11 +399,6 @@
 
     protected void doStart() throws Exception {
         connect();
-        onewayThread = new Thread(null, new Runnable(){
-            public void run() {
-                sendOneways();
-            }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
-        onewayThread.start();
         stoppedLatch.set(new CountDownLatch(1));
         super.doStart();
     }
@@ -520,11 +487,7 @@
                     LOG.debug("Caught exception closing socket",e);
                 }
             }
-        }
-        if( onewayThread!=null ) {
-            onewayThread.join();
-            onewayThread = null;
-            outbound.clear();
+           
         }
     }