You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "MARCHAL (JIRA)" <ji...@apache.org> on 2009/06/15 18:48:35 UTC
[jira] Commented: (AMQ-2139) Batch up multiple socket write calls
in the TCP transport.
[ https://issues.apache.org/activemq/browse/AMQ-2139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=52264#action_52264 ]
MARCHAL commented on AMQ-2139:
------------------------------
Hi
I'm working on servicemix 3 and I had some problems with load testing.
I have multiple servicemix instances interconnected together with activemq brokers.
After long investigation, and JProbe analyses I found that the pb is on the TcpTransport.
So I tried your proposition and it seems to work fine (with a new TcpTransportFactory and a PooledTcpTransport!
But I think it's not the best solution when using some big messages because there are only one socket used to communicate between two servicemix containers.
My questions are :
Is it necessary to have a MutexTransport due to the OpenWireFormat marshall method is already serialized ?
Do you think it is possible to create a pool of sockets in the Tcp transport to build a network of brokers ?
Thanks
> Batch up multiple socket write calls in the TCP transport.
> ----------------------------------------------------------
>
> Key: AMQ-2139
> URL: https://issues.apache.org/activemq/browse/AMQ-2139
> Project: ActiveMQ
> Issue Type: Improvement
> Components: Transport
> Affects Versions: 5.2.0
> Reporter: Hiram Chirino
> Assignee: Hiram Chirino
> Fix For: 6.0.0
>
>
> Investigate using an async write thread for the TCP transport. It would be able to more efficiently batch up multiple writes into a single socket write.
> Bellow is a patch that should be investigated. It should increase write performance of the TCP transport:
> {code}
> $ svn diff
> Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
> ===================================================================
> --- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (revision 742546)
> +++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (working copy)
> @@ -29,7 +29,9 @@
> import java.net.UnknownHostException;
> import java.util.HashMap;
> import java.util.Map;
> +import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.LinkedBlockingQueue;
> import java.util.concurrent.SynchronousQueue;
> import java.util.concurrent.ThreadFactory;
> import java.util.concurrent.ThreadPoolExecutor;
> @@ -119,6 +121,9 @@
> private Boolean tcpNoDelay;
> private Thread runnerThread;
>
> + private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
> + private Thread onewayThread;
> +
> /**
> * Connect to a remote Node - e.g. a Broker
> *
> @@ -157,16 +162,39 @@
> this.localLocation = null;
> setDaemon(true);
> }
> -
> +
> /**
> * A one way asynchronous send
> */
> public void oneway(Object command) throws IOException {
> checkStarted();
> - wireFormat.marshal(command, dataOut);
> - dataOut.flush();
> + try {
> + outbound.put(command);
> + } catch (InterruptedException e) {
> + throw new InterruptedIOException();
> + }
> }
>
> + protected void sendOneways() {
> + try {
> + while(!isStopped()) {
> + Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
> + if( command!=null ) {
> + try {
> + while( command!=null ) {
> + wireFormat.marshal(command, dataOut);
> + command = outbound.poll();
> + }
> + dataOut.flush();
> + } catch (IOException e) {
> + getTransportListener().onException(e);
> + }
> + }
> + }
> + } catch (InterruptedException e) {
> + }
> + }
> +
> /**
> * @return pretty print of 'this'
> */
> @@ -399,6 +427,11 @@
>
> protected void doStart() throws Exception {
> connect();
> + onewayThread = new Thread(null, new Runnable(){
> + public void run() {
> + sendOneways();
> + }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
> + onewayThread.start();
> stoppedLatch.set(new CountDownLatch(1));
> super.doStart();
> }
> @@ -487,8 +520,12 @@
> LOG.debug("Caught exception closing socket",e);
> }
> }
> -
> }
> + if( onewayThread!=null ) {
> + onewayThread.join();
> + onewayThread = null;
> + outbound.clear();
> + }
> }
>
> /**
> {code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.