You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/03 14:29:04 UTC

[2/2] camel git commit: CAMEL-7884: camel-netty4-http should do a defensive copy of the netty bytebuf if the async routing engine kicks in, so any further processing can still read the stream of data. Netty http server will otherwise have closed the orig

CAMEL-7884: camel-netty4-http should do a defensive copy of the netty bytebuf if the async routing engine kicks in, so any further processing can still read the stream of data. Netty http server will otherwise have closed the original bytebuf.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a7ecbafc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a7ecbafc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a7ecbafc

Branch: refs/heads/camel-2.17.x
Commit: a7ecbafc02b10bfac3fb99f21bebb407dd96a645
Parents: 6da2fd4
Author: Claus Ibsen <da...@apache.org>
Authored: Tue May 3 14:28:14 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 3 14:28:54 2016 +0200

----------------------------------------------------------------------
 .../netty4/http/DefaultNettyHttpBinding.java    |  2 +
 .../http/NettyChannelBufferStreamCache.java     |  7 ++-
 ...ttyChannelBufferStreamCacheOnCompletion.java | 53 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a7ecbafc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index 552f5de..fe87f53 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -95,6 +95,8 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
         } else {
             // turn the body into stream cached
             NettyChannelBufferStreamCache cache = new NettyChannelBufferStreamCache(request.content());
+            // add on completion to the cache which is needed for Camel to keep track of the lifecycle of the cache
+            exchange.addOnCompletion(new NettyChannelBufferStreamCacheOnCompletion(cache));
             answer.setBody(cache);
         }
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/a7ecbafc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
index 70635f0..f92fc60 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCache.java
@@ -32,7 +32,7 @@ import org.apache.camel.util.IOHelper;
  */
 public final class NettyChannelBufferStreamCache extends InputStream implements StreamCache {
 
-    private final ByteBuf buffer;
+    private ByteBuf buffer;
 
     public NettyChannelBufferStreamCache(ByteBuf buffer) {
         this.buffer = buffer;
@@ -101,4 +101,9 @@ public final class NettyChannelBufferStreamCache extends InputStream implements
     public long length() {
         return buffer.readableBytes();
     }
+
+    void defensiveCopyBuffer() {
+        // make a defensive copy of the buffer
+        this.buffer = buffer.copy();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a7ecbafc/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
new file mode 100644
index 0000000..0cc51a4
--- /dev/null
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.http;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ * A {@link org.apache.camel.spi.Synchronization} to keep track of the unit of work on the current {@link Exchange}
+ * that has the {@link NettyChannelBufferStreamCache} as message body. This cache is wrapping the raw original
+ * Netty {@link io.netty.buffer.ByteBuf}. Because the Netty HTTP server ({@link HttpServerChannelHandler}) will
+ * close the {@link io.netty.buffer.ByteBuf} when Netty is complete processing the HttpMessage, then any further
+ * access to the cache will cause in a buffer unreadable. In the case of Camel async routing engine will
+ * handover the processing of the {@link Exchange} to another thread, then we need to keep track of this event
+ * so we can do a defensive copy of the netty {@link io.netty.buffer.ByteBuf} so Camel is able to read
+ * the content from other threads, while Netty has closed the original {@link io.netty.buffer.ByteBuf}.
+ */
+public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAdapter {
+
+    private final NettyChannelBufferStreamCache cache;
+
+    public NettyChannelBufferStreamCacheOnCompletion(NettyChannelBufferStreamCache cache) {
+        this.cache = cache;
+    }
+
+    @Override
+    public void onDone(Exchange exchange) {
+        // okay netty is no longer being active, so we need to signal to the cache that its to preserve the buffer if still in need.
+        cache.defensiveCopyBuffer();
+    }
+
+    @Override
+    public boolean allowHandover() {
+        // do not allow handover, so we can do the defensive copy in the onDone method
+        return false;
+    }
+
+}