You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (Resolved) (JIRA)" <ji...@apache.org> on 2012/04/08 14:56:18 UTC
[jira] [Resolved] (CAMEL-4556) NettyProducer creating new
connection on every message
[ https://issues.apache.org/jira/browse/CAMEL-4556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen resolved CAMEL-4556.
--------------------------------
Resolution: Fixed
Fix Version/s: 2.9.2
2.10.0
> NettyProducer creating new connection on every message
> ------------------------------------------------------
>
> Key: CAMEL-4556
> URL: https://issues.apache.org/jira/browse/CAMEL-4556
> Project: Camel
> Issue Type: Bug
> Components: camel-netty
> Affects Versions: 2.8.1
> Reporter: Matthew McMahon
> Assignee: Claus Ibsen
> Priority: Minor
> Fix For: 2.10.0, 2.9.2
>
>
> Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.
> It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).
> Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable or where more than one response is expected on a channel.
> --
> This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none)
> package netty;
> import java.util.Arrays;
> import java.util.Collection;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicInteger;
> import junit.framework.TestCase;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.ExchangePattern;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.junit.Before;
> import org.junit.BeforeClass;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> import org.junit.runners.Parameterized.Parameters;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> @RunWith(Parameterized.class)
> public class NettyTest extends TestCase
> {
> private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
> private final static CamelContext serverContext = new DefaultCamelContext();
> private final CamelContext clientContext = new DefaultCamelContext();
> private final AtomicInteger responseCounter = new AtomicInteger(0);
> private final AtomicBoolean passedTen = new AtomicBoolean(false);
> private Boolean disconnectClient;
> public NettyTest(Boolean disconnectClient)
> {
> this.disconnectClient = disconnectClient;
> }
> @Parameters
> public static Collection<Object[]> configs()
> {
> return Arrays.asList(new Object[][] { { true }, { false } });
> }
> @BeforeClass
> public static void createServer() throws Exception
> {
> serverContext.addRoutes(new RouteBuilder()
> {
> @Override
> public void configure() throws Exception
> {
> from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
> .setExchangePattern(ExchangePattern.InOut)
> .process(new Processor() {
> @Override
> public void process(Exchange exchange) throws Exception
> {
> Object body = exchange.getIn().getBody();
> logger.info("Request received : Value = {}", body);
> }
>
> })
> .transform(constant(3)).stop();
> }
> });
> serverContext.start();
> }
> @Before
> public void createClient() throws Exception
> {
> clientContext.addRoutes(new RouteBuilder()
> {
> @Override
> public void configure() throws Exception
> {
> // Generate an Echo message and ensure a Response is sent
> from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
> .setExchangePattern(ExchangePattern.InOut)
> .transform()
> .constant(2)
> .to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
> .process(new Processor()
> {
> @Override
> public void process(Exchange exchange) throws Exception
> {
> Object body = exchange.getIn().getBody();
> logger.info("Response number {} : Value = {}",
> responseCounter.incrementAndGet(), body);
> if (responseCounter.get() > 10) {
> passedTen.set(true);
> }
> }
> }).stop();
> }
> });
> }
> @Test
> public void test() throws Exception
> {
> clientContext.getShutdownStrategy().setTimeout(1);
> clientContext.start();
> logger.info("Disconnect = {}", this.disconnectClient);
> Thread.sleep(TimeUnit.SECONDS.toMillis(15));
> clientContext.stop();
> assertTrue("More than 10 responses have been received", passedTen.get());
> }
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira