You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/17 07:52:48 UTC

[1/2] camel git commit: CAMEL-10171 memory leak when continuation expires

Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 19f619eee -> e822ae58c
  refs/heads/master f39b83eeb -> 2e9bce8a0


CAMEL-10171 memory leak when continuation expires

setobject done earlier and exception set on camelExchange.
isExpired method call thru Continuation interface is cancelled and
below issues became invalid as discussed in CXF-7011

https://issues.apache.org/jira/browse/CXF-7002
https://issues.apache.org/jira/browse/CXF-7011

instead

such block means the same;

https://issues.apache.org/jira/browse/CXF-7011?focusedCommentId=15422696&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15422696


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e9bce8a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e9bce8a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e9bce8a

Branch: refs/heads/master
Commit: 2e9bce8a064b6694cb7985955582baca90698b0c
Parents: f39b83e
Author: �nder sezgin <on...@gmail.com>
Authored: Tue Aug 16 22:22:46 2016 +0300
Committer: �nder sezgin <on...@gmail.com>
Committed: Tue Aug 16 22:22:46 2016 +0300

----------------------------------------------------------------------
 .../org/apache/camel/component/cxf/CxfConsumer.java   | 14 +++++++++++++-
 .../camel/component/cxf/jaxrs/CxfRsInvoker.java       | 14 +++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e9bce8a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index 2271e2f..e16127b 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -26,6 +26,7 @@ import org.w3c.dom.Element;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Processor;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.impl.DefaultConsumer;
@@ -148,6 +149,8 @@ public class CxfConsumer extends DefaultConsumer {
 
                     // The continuation could be called before the suspend is called
                     continuation.suspend(cxfEndpoint.getContinuationTimeout());
+                    
+                    continuation.setObject(camelExchange);
 
                     // use the asynchronous API to process the exchange
                     getAsyncProcessor().process(camelExchange, new AsyncCallback() {
@@ -156,7 +159,6 @@ public class CxfConsumer extends DefaultConsumer {
                             synchronized (continuation) {
                                 LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId());
                                 // resume processing after both, sync and async callbacks
-                                continuation.setObject(camelExchange);
                                 continuation.resume();
                             }
                         }
@@ -170,6 +172,16 @@ public class CxfConsumer extends DefaultConsumer {
                         CxfConsumer.this.doneUoW(camelExchange);
                     }
 
+                } else if (!continuation.isResumed() && !continuation.isPending()) {
+                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject();
+                    try {
+                        if (!continuation.isPending()) {
+                            camelExchange.setException(new ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout()));
+                        }
+                        setResponseBack(cxfExchange, camelExchange);
+                    } finally {
+                        CxfConsumer.this.doneUoW(camelExchange);
+                    }
                 }
             }
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/2e9bce8a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
index fb999f0..01563d3 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
@@ -26,6 +26,7 @@ import javax.ws.rs.core.UriInfo;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
@@ -90,13 +91,13 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 // The continuation could be called before the suspend is called
                 continuation.suspend(endpoint.getContinuationTimeout());
                 cxfExchange.put(SUSPENED, Boolean.TRUE);
+                continuation.setObject(camelExchange);
                 cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback() {
                     public void done(boolean doneSync) {
                         // make sure the continuation resume will not be called before the suspend method in other thread
                         synchronized (continuation) {
                             LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId());
                             // resume processing after both, sync and async callbacks
-                            continuation.setObject(camelExchange);
                             continuation.resume();
                         }
                     }
@@ -111,6 +112,17 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 } finally {
                     cxfRsConsumer.doneUoW(camelExchange);
                 }
+            } else {
+                if (!continuation.isPending()) {
+                    cxfExchange.put(SUSPENED, Boolean.FALSE);
+                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject();
+                    camelExchange.setException(new ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout()));
+                    try {
+                        return returnResponse(cxfExchange, camelExchange);
+                    } finally {
+                        cxfRsConsumer.doneUoW(camelExchange);
+                    }
+                }
             }
         }
         return null;


[2/2] camel git commit: CAMEL-10171 memory leak when continuation expires

Posted by da...@apache.org.
CAMEL-10171 memory leak when continuation expires

setobject done earlier and exception set on camelExchange.
isExpired method call thru Continuation interface is cancelled and
below issues became invalid as discussed in CXF-7011

https://issues.apache.org/jira/browse/CXF-7002
https://issues.apache.org/jira/browse/CXF-7011

instead

such block means the same;

https://issues.apache.org/jira/browse/CXF-7011?focusedCommentId=15422696&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15422696


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e822ae58
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e822ae58
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e822ae58

Branch: refs/heads/camel-2.17.x
Commit: e822ae58c43aec84cc4d1ff64df008ea22ddd910
Parents: 19f619e
Author: �nder sezgin <on...@gmail.com>
Authored: Tue Aug 16 22:22:46 2016 +0300
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 17 09:52:05 2016 +0200

----------------------------------------------------------------------
 .../org/apache/camel/component/cxf/CxfConsumer.java   | 14 +++++++++++++-
 .../camel/component/cxf/jaxrs/CxfRsInvoker.java       | 14 +++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index 11ec2f2..43561ba 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -26,6 +26,7 @@ import org.w3c.dom.Element;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Processor;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.impl.DefaultConsumer;
@@ -149,6 +150,8 @@ public class CxfConsumer extends DefaultConsumer {
 
                     // The continuation could be called before the suspend is called
                     continuation.suspend(cxfEndpoint.getContinuationTimeout());
+                    
+                    continuation.setObject(camelExchange);
 
                     // use the asynchronous API to process the exchange
                     getAsyncProcessor().process(camelExchange, new AsyncCallback() {
@@ -157,7 +160,6 @@ public class CxfConsumer extends DefaultConsumer {
                             synchronized (continuation) {
                                 LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId());
                                 // resume processing after both, sync and async callbacks
-                                continuation.setObject(camelExchange);
                                 continuation.resume();
                             }
                         }
@@ -171,6 +173,16 @@ public class CxfConsumer extends DefaultConsumer {
                         CxfConsumer.this.doneUoW(camelExchange);
                     }
 
+                } else if (!continuation.isResumed() && !continuation.isPending()) {
+                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject();
+                    try {
+                        if (!continuation.isPending()) {
+                            camelExchange.setException(new ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout()));
+                        }
+                        setResponseBack(cxfExchange, camelExchange);
+                    } finally {
+                        CxfConsumer.this.doneUoW(camelExchange);
+                    }
                 }
             }
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
index fb999f0..01563d3 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
@@ -26,6 +26,7 @@ import javax.ws.rs.core.UriInfo;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
@@ -90,13 +91,13 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 // The continuation could be called before the suspend is called
                 continuation.suspend(endpoint.getContinuationTimeout());
                 cxfExchange.put(SUSPENED, Boolean.TRUE);
+                continuation.setObject(camelExchange);
                 cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback() {
                     public void done(boolean doneSync) {
                         // make sure the continuation resume will not be called before the suspend method in other thread
                         synchronized (continuation) {
                             LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId());
                             // resume processing after both, sync and async callbacks
-                            continuation.setObject(camelExchange);
                             continuation.resume();
                         }
                     }
@@ -111,6 +112,17 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 } finally {
                     cxfRsConsumer.doneUoW(camelExchange);
                 }
+            } else {
+                if (!continuation.isPending()) {
+                    cxfExchange.put(SUSPENED, Boolean.FALSE);
+                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject();
+                    camelExchange.setException(new ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout()));
+                    try {
+                        return returnResponse(cxfExchange, camelExchange);
+                    } finally {
+                        cxfRsConsumer.doneUoW(camelExchange);
+                    }
+                }
             }
         }
         return null;