You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Damian Malczyk (JIRA)" <ji...@apache.org> on 2016/08/11 18:40:20 UTC

[jira] [Comment Edited] (CAMEL-10171) Camel CXF expired continuations cause memory leak

    [ https://issues.apache.org/jira/browse/CAMEL-10171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15417712#comment-15417712 ] 

Damian Malczyk edited comment on CAMEL-10171 at 8/11/16 6:39 PM:
-----------------------------------------------------------------

I think that is possible if {code}continuation.setObject(camelExchange){code} would be placed just before {code}continuation.suspend(cxfEndpoint.getContinuationTimeout()){code} instead of inside processing callback like it is now.

However, if you want to populate CxfExchange from Camel Exchange after expiration event - it may be not the best idea, because you're touching something that is still being processed somwhere else and you will most likely leak to the end-user something that programmer doesn't want him to see.

I think that the best would be to give programmer option to define callback on component (and/or endpoint) level that will let him populate CxfExchane himself in case of continuation timeout.


was (Author: ddms):
I think that is possible if {code}continuation.setObject(camelExchange){code} would be placed just before {code}continuation.suspend(cxfEndpoint.getContinuationTimeout()){code} instead of inside processing callback like it is now.

> Camel CXF expired continuations cause memory leak
> -------------------------------------------------
>
>                 Key: CAMEL-10171
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10171
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-cxf
>    Affects Versions: 2.17.1
>            Reporter: Damian Malczyk
>
> Looks like exchanges expired by CXF continuation timeout are being accumulated in InflightRepository. Tested with Camel 2.17.1 and cxf-rt-transports-http-jetty:
> Dependencies:
> {code}<dependencies>
>         <dependency>
>             <groupId>org.apache.camel</groupId>
>             <artifactId>camel-core</artifactId>
>             <version>2.17.1</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.camel</groupId>
>             <artifactId>camel-cxf</artifactId>
>             <version>2.17.1</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.cxf</groupId>
>             <artifactId>cxf-rt-transports-http-jetty</artifactId>
>             <version>3.1.5</version>
>         </dependency>
>     </dependencies>{code}
> Reproducer:
> {code}import org.apache.camel.CamelContext;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.cxf.CxfEndpoint;
> import org.apache.camel.component.cxf.DataFormat;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.springframework.util.StreamUtils;
> import org.w3c.dom.Document;
> import javax.xml.parsers.DocumentBuilder;
> import javax.xml.parsers.DocumentBuilderFactory;
> import javax.xml.soap.MessageFactory;
> import javax.xml.soap.SOAPMessage;
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.net.HttpURLConnection;
> import java.net.URL;
> import java.util.Timer;
> import java.util.TimerTask;
> import java.util.concurrent.Executor;
> import java.util.concurrent.Executors;
> public class Sample {
>     private final static String URI = "http://127.0.0.1:8080/";
>     private final static long CONTINUATION_TIMEOUT = 100L;
>     private final static long DELAYER_VALUE = 200L;
>     private final static int SENDER_THREADS = Runtime.getRuntime().availableProcessors();
>     private final static int MESSAGES_PER_SENDER = 10000;
>     private static void setupCamel() throws Exception {
>         final CamelContext camelContext = new DefaultCamelContext();
>         final CxfEndpoint endpoint = (CxfEndpoint)camelContext.getEndpoint( "cxf://" + URI );
>         endpoint.setContinuationTimeout( CONTINUATION_TIMEOUT );
>         endpoint.setDataFormat( DataFormat.PAYLOAD );
>         camelContext.addRoutes( new RouteBuilder() {
>             public void configure() throws Exception {
>                 from( endpoint )
>                 .threads()
>                 .setBody( constant( "<ok />" ) )
>                 .delay( DELAYER_VALUE )
>                 .end();
>             }
>         });
>         final TimerTask repoSizeReporter = new TimerTask() {
>             public void run() {
>                 System.out.println( "Inflight repository size: " + camelContext.getInflightRepository().size() );
>                 System.gc();
>                 System.out.println( "Memory usage: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())/(1024*1024) + "MB" );
>             }
>         };
>         final Timer repoSizeReporterTimer = new Timer();
>         repoSizeReporterTimer.schedule( repoSizeReporter, 1000, 1000 );
>         camelContext.start();
>     }
>     private static byte[] createSoapMessage() throws Exception {
>         final StringBuilder payloadBuilder = new StringBuilder( "<payload>" );
>         for( int i = 0; i < 5000; i++ ) {
>             payloadBuilder.append( "<payloadElement />" );
>         }
>         final String payload = payloadBuilder.append( "</payload>" ).toString();
>         final DocumentBuilder documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
>         final Document payloadDocument = documentBuilder.parse( new ByteArrayInputStream( payload.getBytes() ) );
>         final ByteArrayOutputStream soapOutStream = new ByteArrayOutputStream();
>         final SOAPMessage message = MessageFactory.newInstance().createMessage();
>         message.getSOAPBody().addDocument( payloadDocument );
>         message.writeTo( soapOutStream );
>         return soapOutStream.toByteArray();
>     }
>     private static Runnable soapSender() {
>         return () -> {
>             try {
>                 final byte[] soapMessage = createSoapMessage();
>                 for( int i = 0; i < MESSAGES_PER_SENDER; i++ ) {
>                     final HttpURLConnection connection = (HttpURLConnection)new URL( URI ).openConnection();
>                     connection.setDoOutput( true );
>                     connection.setRequestProperty( "Content-Type", "text/xml" );
>                     connection.setRequestProperty( "SOAPAction", "\"\"" );
>                     connection.setRequestMethod( "POST" );
>                     connection.setRequestProperty( "Accept", "*/*" );
>                     connection.connect();
>                     StreamUtils.copy( soapMessage, connection.getOutputStream() );
>                     connection.getResponseCode();
>                     connection.disconnect();
>                 }
>             } catch ( final Exception ex ) {
>                 ex.printStackTrace();
>             }
>         };
>     }
>     public static void main(String[] args) throws Exception {
>         setupCamel();
>         final Executor executor = Executors.newFixedThreadPool( SENDER_THREADS );
>         for( int i = 0; i < SENDER_THREADS; i++ ) {
>             executor.execute( soapSender() );
>         }
>     }
> }{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)