You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/03 15:47:09 UTC

[1/2] camel git commit: CAMEL-7884: Found a better way to handle the lifecycle of the netty buffer with a retain/release so we do not need the defensive copy.

Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x a7ecbafc0 -> a3ac1fba0
  refs/heads/master 13bd2a7f1 -> 6e8d58bc3


CAMEL-7884: Found a better way to handle the lifecycle of the netty buffer with a retain/release so we do not need the defensive copy.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6e8d58bc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6e8d58bc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6e8d58bc

Branch: refs/heads/master
Commit: 6e8d58bc350b9ce06f1309153304c159e4c03e74
Parents: 13bd2a7
Author: Claus Ibsen <da...@apache.org>
Authored: Tue May 3 15:45:06 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 3 15:46:34 2016 +0200

----------------------------------------------------------------------
 .../http/NettyChannelBufferStreamCache.java     | 16 ++++++++++------
 ...ttyChannelBufferStreamCacheOnCompletion.java | 20 ++++----------------
 .../netty4/http/NettyHttpConfiguration.java     |  5 ++++-
 3 files changed, 18 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6e8d58bc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index f92fc60..2d6c7be 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -32,11 +32,12 @@ import org.apache.camel.util.IOHelper;
  */
 public final class NettyChannelBufferStreamCache extends InputStream implements StreamCache {
 
-    private ByteBuf buffer;
+    private final ByteBuf buffer;
 
     public NettyChannelBufferStreamCache(ByteBuf buffer) {
-        this.buffer = buffer;
-        buffer.markReaderIndex();
+        // retain the buffer so we keep it in use until we release it when we are done
+        this.buffer = buffer.retain();
+        this.buffer.markReaderIndex();
     }
 
     @Override
@@ -102,8 +103,11 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
         return buffer.readableBytes();
     }
 
-    void defensiveCopyBuffer() {
-        // make a defensive copy of the buffer
-        this.buffer = buffer.copy();
+    /**
+     * Release the buffer when we are done using it.
+     */
+    public void release() {
+        buffer.release();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6e8d58bc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
index 0cc51a4..343fd13 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
@@ -17,18 +17,11 @@
 package org.apache.camel.component.netty4.http;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
 import org.apache.camel.support.SynchronizationAdapter;
 
 /**
- * A {@link org.apache.camel.spi.Synchronization} to keep track of the unit of work on the current {@link Exchange}
- * that has the {@link NettyChannelBufferStreamCache} as message body. This cache is wrapping the raw original
- * Netty {@link io.netty.buffer.ByteBuf}. Because the Netty HTTP server ({@link HttpServerChannelHandler}) will
- * close the {@link io.netty.buffer.ByteBuf} when Netty is complete processing the HttpMessage, then any further
- * access to the cache will cause in a buffer unreadable. In the case of Camel async routing engine will
- * handover the processing of the {@link Exchange} to another thread, then we need to keep track of this event
- * so we can do a defensive copy of the netty {@link io.netty.buffer.ByteBuf} so Camel is able to read
- * the content from other threads, while Netty has closed the original {@link io.netty.buffer.ByteBuf}.
+ * A {@link org.apache.camel.spi.Synchronization} to handle the lifecycle of the {@link NettyChannelBufferStreamCache}
+ * so the cache is released when the unit of work of the Exchange is done.
  */
 public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAdapter {
 
@@ -40,14 +33,9 @@ public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAd
 
     @Override
     public void onDone(Exchange exchange) {
-        // okay netty is no longer being active, so we need to signal to the cache that its to preserve the buffer if still in need.
-        cache.defensiveCopyBuffer();
+        // release the cache when we are done routing the Exchange
+        cache.release();
     }
 
-    @Override
-    public boolean allowHandover() {
-        // do not allow handover, so we can do the defensive copy in the onDone method
-        return false;
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6e8d58bc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
index 97c3a7f..784d007 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
@@ -249,7 +249,10 @@ public class NettyHttpConfiguration extends NettyConfiguration {
      * can retrieve all data from the stream. However you can set this option to true when you for example need to
      * access the raw stream, such as streaming it directly to a file or other persistent store. Mind that
      * if you enable this option, then you cannot read the Netty stream multiple times out of the box, and you would
-     * need manually to reset the reader index on the Netty raw stream.
+     * need manually to reset the reader index on the Netty raw stream. Also Netty will auto-close the Netty stream
+     * when the Netty HTTP server is done processing, which means that if the asynchronous routing engine is in
+     * use then any asynchronous thread that may continue routing the {@link org.apache.camel.Exchange} may not
+     * be able to read the Netty stream, because Netty has closed it.
      */
     public void setDisableStreamCache(boolean disableStreamCache) {
         this.disableStreamCache = disableStreamCache;


[2/2] camel git commit: CAMEL-7884: Found a better way to handle the lifecycle of the netty buffer with a retain/release so we do not need the defensive copy.

Posted by da...@apache.org.
CAMEL-7884: Found a better way to handle the lifecycle of the netty buffer with a retain/release so we do not need the defensive copy.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a3ac1fba
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a3ac1fba
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a3ac1fba

Branch: refs/heads/camel-2.17.x
Commit: a3ac1fba024db0f643841ec708095786fd503c94
Parents: a7ecbaf
Author: Claus Ibsen <da...@apache.org>
Authored: Tue May 3 15:45:06 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 3 15:46:53 2016 +0200

----------------------------------------------------------------------
 .../http/NettyChannelBufferStreamCache.java     | 16 ++++++++++------
 ...ttyChannelBufferStreamCacheOnCompletion.java | 20 ++++----------------
 .../netty4/http/NettyHttpConfiguration.java     |  5 ++++-
 3 files changed, 18 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index f92fc60..2d6c7be 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -32,11 +32,12 @@ import org.apache.camel.util.IOHelper;
  */
 public final class NettyChannelBufferStreamCache extends InputStream implements StreamCache {
 
-    private ByteBuf buffer;
+    private final ByteBuf buffer;
 
     public NettyChannelBufferStreamCache(ByteBuf buffer) {
-        this.buffer = buffer;
-        buffer.markReaderIndex();
+        // retain the buffer so we keep it in use until we release it when we are done
+        this.buffer = buffer.retain();
+        this.buffer.markReaderIndex();
     }
 
     @Override
@@ -102,8 +103,11 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
         return buffer.readableBytes();
     }
 
-    void defensiveCopyBuffer() {
-        // make a defensive copy of the buffer
-        this.buffer = buffer.copy();
+    /**
+     * Release the buffer when we are done using it.
+     */
+    public void release() {
+        buffer.release();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
index 0cc51a4..343fd13 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
@@ -17,18 +17,11 @@
 package org.apache.camel.component.netty4.http;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
 import org.apache.camel.support.SynchronizationAdapter;
 
 /**
- * A {@link org.apache.camel.spi.Synchronization} to keep track of the unit of work on the current {@link Exchange}
- * that has the {@link NettyChannelBufferStreamCache} as message body. This cache is wrapping the raw original
- * Netty {@link io.netty.buffer.ByteBuf}. Because the Netty HTTP server ({@link HttpServerChannelHandler}) will
- * close the {@link io.netty.buffer.ByteBuf} when Netty is complete processing the HttpMessage, then any further
- * access to the cache will cause in a buffer unreadable. In the case of Camel async routing engine will
- * handover the processing of the {@link Exchange} to another thread, then we need to keep track of this event
- * so we can do a defensive copy of the netty {@link io.netty.buffer.ByteBuf} so Camel is able to read
- * the content from other threads, while Netty has closed the original {@link io.netty.buffer.ByteBuf}.
+ * A {@link org.apache.camel.spi.Synchronization} to handle the lifecycle of the {@link NettyChannelBufferStreamCache}
+ * so the cache is released when the unit of work of the Exchange is done.
  */
 public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAdapter {
 
@@ -40,14 +33,9 @@ public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAd
 
     @Override
     public void onDone(Exchange exchange) {
-        // okay netty is no longer being active, so we need to signal to the cache that its to preserve the buffer if still in need.
-        cache.defensiveCopyBuffer();
+        // release the cache when we are done routing the Exchange
+        cache.release();
     }
 
-    @Override
-    public boolean allowHandover() {
-        // do not allow handover, so we can do the defensive copy in the onDone method
-        return false;
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a3ac1fba/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
index 97c3a7f..784d007 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
@@ -249,7 +249,10 @@ public class NettyHttpConfiguration extends NettyConfiguration {
      * can retrieve all data from the stream. However you can set this option to true when you for example need to
      * access the raw stream, such as streaming it directly to a file or other persistent store. Mind that
      * if you enable this option, then you cannot read the Netty stream multiple times out of the box, and you would
-     * need manually to reset the reader index on the Netty raw stream.
+     * need manually to reset the reader index on the Netty raw stream. Also Netty will auto-close the Netty stream
+     * when the Netty HTTP server is done processing, which means that if the asynchronous routing engine is in
+     * use then any asynchronous thread that may continue routing the {@link org.apache.camel.Exchange} may not
+     * be able to read the Netty stream, because Netty has closed it.
      */
     public void setDisableStreamCache(boolean disableStreamCache) {
         this.disableStreamCache = disableStreamCache;