You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2015/08/20 17:43:45 UTC
[jira] [Closed] (AMQ-2139) Batch up multiple socket write calls in
the TCP transport.
[ https://issues.apache.org/jira/browse/AMQ-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish closed AMQ-2139.
-----------------------------
Resolution: Won't Fix
> Batch up multiple socket write calls in the TCP transport.
> ----------------------------------------------------------
>
> Key: AMQ-2139
> URL: https://issues.apache.org/jira/browse/AMQ-2139
> Project: ActiveMQ
> Issue Type: Improvement
> Components: Transport
> Affects Versions: 5.2.0
> Reporter: Hiram Chirino
> Assignee: Hiram Chirino
> Fix For: 6.0.0
>
>
> Investigate using an async write thread for the TCP transport. It would be able to more efficiently batch up multiple writes into a single socket write.
> Bellow is a patch that should be investigated. It should increase write performance of the TCP transport:
> {code}
> $ svn diff
> Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (revision 742546)
> +++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (working copy)
> @@ -29,7 +29,9 @@
> import java.net.UnknownHostException;
> import java.util.HashMap;
> import java.util.Map;
> +import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.LinkedBlockingQueue;
> import java.util.concurrent.SynchronousQueue;
> import java.util.concurrent.ThreadFactory;
> import java.util.concurrent.ThreadPoolExecutor;
> @@ -119,6 +121,9 @@
> private Boolean tcpNoDelay;
> private Thread runnerThread;
>
> + private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
> + private Thread onewayThread;
> +
> /**
> * Connect to a remote Node - e.g. a Broker
> *
> @@ -157,16 +162,39 @@
> this.localLocation = null;
> setDaemon(true);
> }
> -
> +
> /**
> * A one way asynchronous send
> */
> public void oneway(Object command) throws IOException {
> checkStarted();
> - wireFormat.marshal(command, dataOut);
> - dataOut.flush();
> + try {
> + outbound.put(command);
> + } catch (InterruptedException e) {
> + throw new InterruptedIOException();
> + }
> }
>
> + protected void sendOneways() {
> + try {
> + while(!isStopped()) {
> + Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
> + if( command!=null ) {
> + try {
> + while( command!=null ) {
> + wireFormat.marshal(command, dataOut);
> + command = outbound.poll();
> + }
> + dataOut.flush();
> + } catch (IOException e) {
> + getTransportListener().onException(e);
> + }
> + }
> + }
> + } catch (InterruptedException e) {
> + }
> + }
> +
> /**
> * @return pretty print of 'this'
> */
> @@ -399,6 +427,11 @@
>
> protected void doStart() throws Exception {
> connect();
> + onewayThread = new Thread(null, new Runnable(){
> + public void run() {
> + sendOneways();
> + }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
> + onewayThread.start();
> stoppedLatch.set(new CountDownLatch(1));
> super.doStart();
> }
> @@ -487,8 +520,12 @@
> LOG.debug("Caught exception closing socket",e);
> }
> }
> -
> }
> + if( onewayThread!=null ) {
> + onewayThread.join();
> + onewayThread = null;
> + outbound.clear();
> + }
> }
>
> /**
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)