You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "MARCHAL (JIRA)" <ji...@apache.org> on 2009/06/15 18:48:35 UTC

[jira] Commented: (AMQ-2139) Batch up multiple socket write calls in the TCP transport.

    [ https://issues.apache.org/activemq/browse/AMQ-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=52264#action_52264 ] 

MARCHAL commented on AMQ-2139:
------------------------------

Hi

I'm working on servicemix 3 and I had some problems with load testing.
I have multiple servicemix instances interconnected together with activemq brokers.
After long investigation, and JProbe analyses I found that the pb is on the TcpTransport.

So I tried your proposition and it seems to work fine (with a new TcpTransportFactory and a PooledTcpTransport!
But I think it's not the best solution when using some big messages because there are only one socket used to communicate between two servicemix containers.

My questions are :
  Is it necessary to have a MutexTransport due to the OpenWireFormat marshall method is already serialized ?
  Do you think it is possible to create a pool of sockets in the Tcp transport to build a network of brokers ?

Thanks

> Batch up multiple socket write calls in the TCP transport.
> ----------------------------------------------------------
>
>                 Key: AMQ-2139
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2139
>             Project: ActiveMQ
>          Issue Type: Improvement
>          Components: Transport
>    Affects Versions: 5.2.0
>            Reporter: Hiram Chirino
>            Assignee: Hiram Chirino
>             Fix For: 6.0.0
>
>
> Investigate using an async write thread for the TCP transport.  It would be able to more efficiently batch up multiple writes into a single socket write. 
> Bellow is a patch that should be investigated.  It should increase write performance of the TCP transport:
> {code}
> $ svn diff
> Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(revision 742546)
> +++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(working copy)
> @@ -29,7 +29,9 @@
>  import java.net.UnknownHostException;
>  import java.util.HashMap;
>  import java.util.Map;
> +import java.util.concurrent.ArrayBlockingQueue;
>  import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.LinkedBlockingQueue;
>  import java.util.concurrent.SynchronousQueue;
>  import java.util.concurrent.ThreadFactory;
>  import java.util.concurrent.ThreadPoolExecutor;
> @@ -119,6 +121,9 @@
>      private Boolean tcpNoDelay;
>      private Thread runnerThread;
>  
> +    private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
> +    private Thread onewayThread;
> +
>      /**
>       * Connect to a remote Node - e.g. a Broker
>       * 
> @@ -157,16 +162,39 @@
>          this.localLocation = null;
>          setDaemon(true);
>      }
> -
> +    
>      /**
>       * A one way asynchronous send
>       */
>      public void oneway(Object command) throws IOException {
>          checkStarted();
> -        wireFormat.marshal(command, dataOut);
> -        dataOut.flush();
> +        try {
> +            outbound.put(command);
> +        } catch (InterruptedException e) {
> +            throw new InterruptedIOException();
> +        }
>      }
>  
> +    protected void sendOneways() {
> +        try {
> +            while(!isStopped()) {
> +                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
> +                if( command!=null ) {
> +                    try {
> +                        while( command!=null ) {
> +                            wireFormat.marshal(command, dataOut);
> +                            command = outbound.poll();
> +                        }
> +                        dataOut.flush();
> +                    } catch (IOException e) {
> +                        getTransportListener().onException(e);
> +                    }
> +                }
> +            }
> +        } catch (InterruptedException e) {
> +        }
> +    }
> +
>      /**
>       * @return pretty print of 'this'
>       */
> @@ -399,6 +427,11 @@
>  
>      protected void doStart() throws Exception {
>          connect();
> +        onewayThread = new Thread(null, new Runnable(){
> +            public void run() {
> +                sendOneways();
> +            }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
> +        onewayThread.start();
>          stoppedLatch.set(new CountDownLatch(1));
>          super.doStart();
>      }
> @@ -487,8 +520,12 @@
>                      LOG.debug("Caught exception closing socket",e);
>                  }
>              }
> -           
>          }
> +        if( onewayThread!=null ) {
> +            onewayThread.join();
> +            onewayThread = null;
> +            outbound.clear();
> +        }
>      }
>  
>      /**
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.