You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "onder sezgin (JIRA)" <ji...@apache.org> on 2016/08/08 13:12:20 UTC
[jira] [Commented] (CAMEL-10171) Camel CXF expired continuations
cause memory leak
[ https://issues.apache.org/jira/browse/CAMEL-10171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411781#comment-15411781 ]
onder sezgin commented on CAMEL-10171:
--------------------------------------
I went through the problem. As i can see, cxf-rt-transports-http-jetty-3.1.7 has no such method to propagate the continuation time out back to camel-cxf. First, Servlet3Continuation and and JettyContinuationWrapper should have the required method implementation to query if timeout is expired and camel-cxf should have such implementation as the following;
else if (continuation.isExpired()) {
// mark the exchange with server timeout error
// trigger the exhange as done so that it is not counted as inflight.
.....
}
I will try and see if i can provide PR.
> Camel CXF expired continuations cause memory leak
> -------------------------------------------------
>
> Key: CAMEL-10171
> URL: https://issues.apache.org/jira/browse/CAMEL-10171
> Project: Camel
> Issue Type: Bug
> Components: camel-cxf
> Affects Versions: 2.17.1
> Reporter: Damian Malczyk
> Fix For: 2.17.3, 2.18.0
>
>
> Looks like exchanges expired by CXF continuation timeout are being accumulated in InflightRepository. Tested with Camel 2.17.1 and cxf-rt-transports-http-jetty:
> Dependencies:
> {code}<dependencies>
> <dependency>
> <groupId>org.apache.camel</groupId>
> <artifactId>camel-core</artifactId>
> <version>2.17.1</version>
> </dependency>
> <dependency>
> <groupId>org.apache.camel</groupId>
> <artifactId>camel-cxf</artifactId>
> <version>2.17.1</version>
> </dependency>
> <dependency>
> <groupId>org.apache.cxf</groupId>
> <artifactId>cxf-rt-transports-http-jetty</artifactId>
> <version>3.1.5</version>
> </dependency>
> </dependencies>{code}
> Reproducer:
> {code}import org.apache.camel.CamelContext;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.cxf.CxfEndpoint;
> import org.apache.camel.component.cxf.DataFormat;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.springframework.util.StreamUtils;
> import org.w3c.dom.Document;
> import javax.xml.parsers.DocumentBuilder;
> import javax.xml.parsers.DocumentBuilderFactory;
> import javax.xml.soap.MessageFactory;
> import javax.xml.soap.SOAPMessage;
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.net.HttpURLConnection;
> import java.net.URL;
> import java.util.Timer;
> import java.util.TimerTask;
> import java.util.concurrent.Executor;
> import java.util.concurrent.Executors;
> public class Sample {
> private final static String URI = "http://127.0.0.1:8080/";
> private final static long CONTINUATION_TIMEOUT = 100L;
> private final static long DELAYER_VALUE = 200L;
> private final static int SENDER_THREADS = Runtime.getRuntime().availableProcessors();
> private final static int MESSAGES_PER_SENDER = 10000;
> private static void setupCamel() throws Exception {
> final CamelContext camelContext = new DefaultCamelContext();
> final CxfEndpoint endpoint = (CxfEndpoint)camelContext.getEndpoint( "cxf://" + URI );
> endpoint.setContinuationTimeout( CONTINUATION_TIMEOUT );
> endpoint.setDataFormat( DataFormat.PAYLOAD );
> camelContext.addRoutes( new RouteBuilder() {
> public void configure() throws Exception {
> from( endpoint )
> .threads()
> .setBody( constant( "<ok />" ) )
> .delay( DELAYER_VALUE )
> .end();
> }
> });
> final TimerTask repoSizeReporter = new TimerTask() {
> public void run() {
> System.out.println( "Inflight repository size: " + camelContext.getInflightRepository().size() );
> System.gc();
> System.out.println( "Memory usage: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())/(1024*1024) + "MB" );
> }
> };
> final Timer repoSizeReporterTimer = new Timer();
> repoSizeReporterTimer.schedule( repoSizeReporter, 1000, 1000 );
> camelContext.start();
> }
> private static byte[] createSoapMessage() throws Exception {
> final StringBuilder payloadBuilder = new StringBuilder( "<payload>" );
> for( int i = 0; i < 5000; i++ ) {
> payloadBuilder.append( "<payloadElement />" );
> }
> final String payload = payloadBuilder.append( "</payload>" ).toString();
> final DocumentBuilder documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
> final Document payloadDocument = documentBuilder.parse( new ByteArrayInputStream( payload.getBytes() ) );
> final ByteArrayOutputStream soapOutStream = new ByteArrayOutputStream();
> final SOAPMessage message = MessageFactory.newInstance().createMessage();
> message.getSOAPBody().addDocument( payloadDocument );
> message.writeTo( soapOutStream );
> return soapOutStream.toByteArray();
> }
> private static Runnable soapSender() {
> return () -> {
> try {
> final byte[] soapMessage = createSoapMessage();
> for( int i = 0; i < MESSAGES_PER_SENDER; i++ ) {
> final HttpURLConnection connection = (HttpURLConnection)new URL( URI ).openConnection();
> connection.setDoOutput( true );
> connection.setRequestProperty( "Content-Type", "text/xml" );
> connection.setRequestProperty( "SOAPAction", "\"\"" );
> connection.setRequestMethod( "POST" );
> connection.setRequestProperty( "Accept", "*/*" );
> connection.connect();
> StreamUtils.copy( soapMessage, connection.getOutputStream() );
> connection.getResponseCode();
> connection.disconnect();
> }
> } catch ( final Exception ex ) {
> ex.printStackTrace();
> }
> };
> }
> public static void main(String[] args) throws Exception {
> setupCamel();
> final Executor executor = Executors.newFixedThreadPool( SENDER_THREADS );
> for( int i = 0; i < SENDER_THREADS; i++ ) {
> executor.execute( soapSender() );
> }
> }
> }{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)