You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (Resolved) (JIRA)" <ji...@apache.org> on 2012/04/08 14:56:18 UTC

[jira] [Resolved] (CAMEL-4556) NettyProducer creating new connection on every message

     [ https://issues.apache.org/jira/browse/CAMEL-4556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen resolved CAMEL-4556.
--------------------------------

       Resolution: Fixed
    Fix Version/s: 2.9.2
                   2.10.0
    
> NettyProducer creating new connection on every message
> ------------------------------------------------------
>
>                 Key: CAMEL-4556
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4556
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-netty
>    Affects Versions: 2.8.1
>            Reporter: Matthew McMahon
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.10.0, 2.9.2
>
>
> Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.
> It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).
> Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable or where more than one response is expected on a channel.
> --
> This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none) 
> package netty; 
> import java.util.Arrays; 
> import java.util.Collection; 
> import java.util.concurrent.TimeUnit; 
> import java.util.concurrent.atomic.AtomicBoolean; 
> import java.util.concurrent.atomic.AtomicInteger; 
> import junit.framework.TestCase; 
> import org.apache.camel.CamelContext; 
> import org.apache.camel.Exchange; 
> import org.apache.camel.ExchangePattern; 
> import org.apache.camel.Processor; 
> import org.apache.camel.builder.RouteBuilder; 
> import org.apache.camel.impl.DefaultCamelContext; 
> import org.junit.Before; 
> import org.junit.BeforeClass; 
> import org.junit.Test; 
> import org.junit.runner.RunWith; 
> import org.junit.runners.Parameterized; 
> import org.junit.runners.Parameterized.Parameters; 
> import org.slf4j.Logger; 
> import org.slf4j.LoggerFactory; 
> @RunWith(Parameterized.class) 
> public class NettyTest extends TestCase 
> { 
>     private final static Logger logger = LoggerFactory.getLogger(NettyTest.class); 
>     private final static CamelContext serverContext = new DefaultCamelContext(); 
>     private final CamelContext clientContext = new DefaultCamelContext(); 
>     private final AtomicInteger responseCounter = new AtomicInteger(0); 
>     private final AtomicBoolean passedTen = new AtomicBoolean(false); 
>     private Boolean disconnectClient; 
>     public NettyTest(Boolean disconnectClient) 
>     { 
>         this.disconnectClient = disconnectClient; 
>     } 
>     @Parameters 
>     public static Collection<Object[]> configs() 
>     { 
>         return Arrays.asList(new Object[][] { { true }, { false } }); 
>     } 
>     @BeforeClass 
>     public static void createServer() throws Exception 
>     { 
>         serverContext.addRoutes(new RouteBuilder() 
>         { 
>             @Override 
>             public void configure() throws Exception 
>             { 
>                 from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false") 
>                         .setExchangePattern(ExchangePattern.InOut) 
>                         .process(new Processor() { 
>                             @Override 
>                             public void process(Exchange exchange) throws Exception 
>                             { 
>                                 Object body = exchange.getIn().getBody(); 
>                                 logger.info("Request received : Value = {}", body); 
>                             } 
>                             
>                         }) 
>                         .transform(constant(3)).stop(); 
>             } 
>         }); 
>         serverContext.start(); 
>     } 
>     @Before 
>     public void createClient() throws Exception 
>     { 
>         clientContext.addRoutes(new RouteBuilder() 
>         { 
>             @Override 
>             public void configure() throws Exception 
>             { 
>                 // Generate an Echo message and ensure a Response is sent 
>                 from("timer://echoTimer?delay=1s&fixedRate=true&period=1s") 
>                         .setExchangePattern(ExchangePattern.InOut) 
>                         .transform() 
>                         .constant(2) 
>                         .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString()) 
>                         .process(new Processor() 
>                         { 
>                             @Override 
>                             public void process(Exchange exchange) throws Exception 
>                             { 
>                                 Object body = exchange.getIn().getBody(); 
>                                 logger.info("Response number {} : Value = {}", 
>                                         responseCounter.incrementAndGet(), body); 
>                                 if (responseCounter.get() > 10) { 
>                                     passedTen.set(true); 
>                                 } 
>                             } 
>                         }).stop(); 
>             } 
>         }); 
>     } 
>     @Test 
>     public void test() throws Exception 
>     { 
>         clientContext.getShutdownStrategy().setTimeout(1); 
>         clientContext.start(); 
>         logger.info("Disconnect = {}", this.disconnectClient); 
>         Thread.sleep(TimeUnit.SECONDS.toMillis(15)); 
>         clientContext.stop(); 
>         assertTrue("More than 10 responses have been received", passedTen.get()); 
>     } 
> } 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira