You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/04 12:08:10 UTC

[1/8] camel git commit: CAMEL-9040: Fixed netty leak in http4 producer

Repository: camel
Updated Branches:
  refs/heads/master b4a6ad4cf -> 8efbf0303


CAMEL-9040: Fixed netty leak in http4 producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8efbf030
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8efbf030
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8efbf030

Branch: refs/heads/master
Commit: 8efbf0303949c0f0e4e02f9309f6c35c24ccfa9e
Parents: 9fc3e43
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 14:06:26 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 components/camel-netty4-http/run-test-leak.sh   | 31 ++++++++++++++++++++
 .../netty4/http/DefaultNettyHttpBinding.java    |  9 ++----
 2 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8efbf030/components/camel-netty4-http/run-test-leak.sh
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/run-test-leak.sh b/components/camel-netty4-http/run-test-leak.sh
new file mode 100755
index 0000000..409a814
--- /dev/null
+++ b/components/camel-netty4-http/run-test-leak.sh
@@ -0,0 +1,31 @@
+#!/bin/sh
+
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+echo 'Running tests with Netty leak detection ...'
+mvn clean install -Dio.netty.leakDetectionLevel=paranoid -Dio.netty.leakDetection.maxRecords=20
+
+echo 'Checking log file if there is any leaks ...'
+
+if grep LEAK target/camel-netty4-http-test.log; then
+    echo 'LEAK found'
+    exit 1
+else
+    echo 'No LEAK found'
+    exit 0
+fi

http://git-wip-us.apache.org/repos/asf/camel/blob/8efbf030/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index 9f94ea0..6364f9a 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.component.netty4.http;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -46,11 +44,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.StreamCache;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyConverter;
-import org.apache.camel.converter.stream.ByteArrayInputStreamCache;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
@@ -276,12 +272,11 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
             // keep the body as is, and use type converters
             answer.setBody(response.content());
         } else {
-            // stores as byte array as the netty ByteBuf will be freed when the producer is done,
-            // and then we can no longer access the message body
+            // stores as byte array as the netty ByteBuf will be freed when the producer is done, and then we can no longer access the message body
             response.retain();
             try {
                 byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, response.content());
-                answer.setBody(new ByteArrayInputStreamCache(new ByteArrayInputStream(bytes)));
+                answer.setBody(bytes);
             } finally {
                 response.release();
             }


[6/8] camel git commit: CAMEL-9040: Fixed netty leak

Posted by da...@apache.org.
CAMEL-9040: Fixed netty leak


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16c5e34b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16c5e34b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16c5e34b

Branch: refs/heads/master
Commit: 16c5e34b6c6d0f8f48ba51f159dd9897576847d4
Parents: e7782eb
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 11:04:31 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../netty4/http/DefaultNettyHttpBinding.java    |  8 +++++++-
 .../http/HttpClientInitializerFactory.java      |  2 --
 .../http/HttpServerInitializerFactory.java      |  3 +--
 .../HttpServerSharedInitializerFactory.java     |  2 +-
 ...ttyChannelBufferStreamCacheOnCompletion.java |  1 -
 .../netty4/http/NettyHttpProducer.java          |  1 -
 .../http/handlers/HttpServerChannelHandler.java | 21 +++++++++++---------
 7 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/16c5e34b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index fe87f53..e3a28f7 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -211,7 +211,13 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
             String charset = "UTF-8";
 
             // Push POST form params into the headers to retain compatibility with DefaultHttpBinding
-            String body = request.content().toString(Charset.forName(charset));
+            String body = null;
+            ByteBuf buffer = request.content();
+            try {
+                body = buffer.toString(Charset.forName(charset));
+            } finally {
+                buffer.release();
+            }
             if (ObjectHelper.isNotEmpty(body)) {
                 for (String param : body.split("&")) {
                     String[] pair = param.split("=", 2);

http://git-wip-us.apache.org/repos/asf/camel/blob/16c5e34b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
index 04bae3d..b5775a8 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpClientInitializerFactory.java
@@ -118,8 +118,6 @@ public class HttpClientInitializerFactory extends ClientInitializerFactory {
        
         // handler to route Camel messages
         pipeline.addLast("handler", new HttpClientChannelHandler(producer));
-
-        
     }
 
     private SSLContext createSSLContext(NettyProducer producer) throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/16c5e34b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
index 84a34c2..4ed4322 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerInitializerFactory.java
@@ -97,8 +97,6 @@ public class HttpServerInitializerFactory extends ServerInitializerFactory {
             }
             pipeline.addLast("decoder-" + x, decoder);
         }
-        pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
-
         pipeline.addLast("encoder", new HttpResponseEncoder());
         List<ChannelHandler> encoders = consumer.getConfiguration().getEncoders();
         for (int x = 0; x < encoders.size(); x++) {
@@ -109,6 +107,7 @@ public class HttpServerInitializerFactory extends ServerInitializerFactory {
             }
             pipeline.addLast("encoder-" + x, encoder);
         }
+        pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
         if (supportCompressed()) {
             pipeline.addLast("deflater", new HttpContentCompressor());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/16c5e34b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
index 71e9129..698cd15 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/HttpServerSharedInitializerFactory.java
@@ -83,10 +83,10 @@ public class HttpServerSharedInitializerFactory extends HttpServerInitializerFac
         }
 
         pipeline.addLast("decoder", new HttpRequestDecoder(409, configuration.getMaxHeaderSize(), 8192));
+        pipeline.addLast("encoder", new HttpResponseEncoder());
         if (configuration.isChunked()) {
             pipeline.addLast("aggregator", new HttpObjectAggregator(configuration.getChunkedMaxContentLength()));
         }
-        pipeline.addLast("encoder", new HttpResponseEncoder());
         if (configuration.isCompression()) {
             pipeline.addLast("deflater", new HttpContentCompressor());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/16c5e34b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
index 343fd13..37654da 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyChannelBufferStreamCacheOnCompletion.java
@@ -37,5 +37,4 @@ public class NettyChannelBufferStreamCacheOnCompletion extends SynchronizationAd
         cache.release();
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/16c5e34b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
index 58344c2..ced0bdc 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
@@ -96,7 +96,6 @@ public class NettyHttpProducer extends NettyProducer {
                 if (nettyMessage != null) {
                     FullHttpResponse response = nettyMessage.getHttpResponse();
                     // Need to retain the ByteBuffer for producer to consumer
-                    // TODO Remove this part of ByteBuffer right away
                     if (response != null) {
                         response.content().retain();
                         // the actual url is stored on the IN message in the getRequestBody method as its accessed on-demand

http://git-wip-us.apache.org/repos/asf/camel/blob/16c5e34b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java
index 39b2dda..d528675 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/handlers/HttpServerChannelHandler.java
@@ -21,7 +21,6 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.charset.Charset;
 import java.util.Iterator;
 import java.util.Locale;
-
 import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
 
@@ -43,9 +42,9 @@ import org.apache.camel.component.netty4.http.NettyHttpSecurityConfiguration;
 import org.apache.camel.component.netty4.http.SecurityAuthenticator;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ObjectHelper;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
@@ -247,13 +246,17 @@ public class HttpServerChannelHandler extends ServerChannelHandler {
                     // the decoded part is base64 encoded, so we need to decode that
                     ByteBuf buf = NettyConverter.toByteBuffer(decoded.getBytes());
                     ByteBuf out = Base64.decode(buf);
-                    String userAndPw = out.toString(Charset.defaultCharset());
-                    String username = ObjectHelper.before(userAndPw, ":");
-                    String password = ObjectHelper.after(userAndPw, ":");
-                    HttpPrincipal principal = new HttpPrincipal(username, password);
-
-                    LOG.debug("Extracted Basic Auth principal from HTTP header: {}", principal);
-                    return principal;
+                    try {
+                        String userAndPw = out.toString(Charset.defaultCharset());
+                        String username = ObjectHelper.before(userAndPw, ":");
+                        String password = ObjectHelper.after(userAndPw, ":");
+                        HttpPrincipal principal = new HttpPrincipal(username, password);
+                        LOG.debug("Extracted Basic Auth principal from HTTP header: {}", principal);
+                        return principal;
+                    } finally {
+                        buf.release();
+                        out.release();
+                    }
                 }
             }
         }


[2/8] camel git commit: CAMEL-9040: Fixed netty leak in http4 producer

Posted by da...@apache.org.
CAMEL-9040: Fixed netty leak in http4 producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3563f6e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3563f6e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3563f6e6

Branch: refs/heads/master
Commit: 3563f6e6a4cbea2841cdd6e780156683aa575b14
Parents: 74a7020
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 13:40:38 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../netty4/http/DefaultNettyHttpBinding.java    | 20 +++++-
 .../http/NettyHttpOperationFailedException.java | 28 ++++++--
 .../netty4/http/NettyHttpProducer.java          | 67 ++++++++++++++------
 .../netty4/http/NettyHttp500ErrorTest.java      |  2 +-
 ...yHttp500ErrorThrowExceptionOnServerTest.java |  3 +-
 .../netty4/http/NettyHttpHandle404Test.java     |  4 +-
 .../netty4/http/NettyHttpOkStatusCodeTest.java  |  3 +-
 .../netty4/http/NettyHttpReturnFaultTest.java   |  3 +-
 .../netty4/handlers/ClientChannelHandler.java   |  2 +
 9 files changed, 97 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index e3a28f7..f8cf4a3 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty4.http;
 
 import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -44,6 +45,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyConverter;
@@ -268,8 +270,21 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
             populateCamelHeaders(response, answer.getHeaders(), exchange, configuration);
         }
 
-        // keep the body as is, and use type converters
-        answer.setBody(response.content());
+        if (configuration.isDisableStreamCache()) {
+            // keep the body as is, and use type converters
+            answer.setBody(response.content());
+        } else {
+            // stores as byte array as the netty ByteBuf will be freedy when the producer is done, and then we
+            // can no longer access the message body
+            response.retain();
+            try {
+                byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, response.content());
+                answer.setBody(bytes);
+                // TODO: use stream caching
+            } finally {
+                response.release();
+            }
+        }
         return answer;
     }
 
@@ -320,7 +335,6 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
         
         LOG.trace("HTTP Status Code: {}", code);
 
-
         // if there was an exception then use that as body
         if (cause != null) {
             if (configuration.isTransferException()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
index abea14d..d75ee31 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpOperationFailedException.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.component.netty4.http;
 
+import java.io.UnsupportedEncodingException;
+
 import io.netty.handler.codec.http.HttpContent;
 import org.apache.camel.CamelException;
+import org.apache.camel.component.netty4.NettyConverter;
 import org.apache.camel.util.ObjectHelper;
 
-
 /**
  * Exception when a Netty HTTP operation failed.
  */
@@ -31,6 +33,7 @@ public class NettyHttpOperationFailedException extends CamelException {
     private final int statusCode;
     private final String statusText;
     private final transient HttpContent content;
+    private String contentAsString;
 
     public NettyHttpOperationFailedException(String uri, int statusCode, String statusText, String location, HttpContent content) {
         super("Netty HTTP operation failed invoking " + uri + " with statusCode: " + statusCode + (location != null ? ", redirectLocation: " + location : ""));
@@ -39,6 +42,11 @@ public class NettyHttpOperationFailedException extends CamelException {
         this.statusText = statusText;
         this.redirectLocation = location;
         this.content = content;
+        try {
+            this.contentAsString = NettyConverter.toString(content.content(), null);
+        } catch (UnsupportedEncodingException e) {
+            // ignore
+        }
     }
 
     public String getUri() {
@@ -70,8 +78,20 @@ public class NettyHttpOperationFailedException extends CamelException {
      * <p/>
      * Notice this may be <tt>null</tt> if this exception has been serialized,
      * as the {@link HttpContent} instance is marked as transient in this class.
+     *
+     * @deprecated use getContentAsString();
      */
+    @Deprecated
     public HttpContent getHttpContent() {
         return content;
     }
+
+    /**
+     * Gets the HTTP content as a String
+     * <p/>
+     * Notice this may be <tt>null</tt> if it was not possible to read the content
+     */
+    public String getContentAsString() {
+        return contentAsString;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
index ced0bdc..37e7ad8 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpProducer.java
@@ -21,11 +21,14 @@ import java.net.URI;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.netty4.NettyConfiguration;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyProducer;
+import org.apache.camel.support.SynchronizationAdapter;
 
 
 /**
@@ -58,7 +61,7 @@ public class NettyHttpProducer extends NettyProducer {
         String uri = NettyHttpHelper.createURL(exchange, getEndpoint());
         URI u = NettyHttpHelper.createURI(exchange, uri, getEndpoint());
 
-        HttpRequest request = getEndpoint().getNettyHttpBinding().toNettyRequest(exchange.getIn(), u.toString(), getConfiguration());
+        final HttpRequest request = getEndpoint().getNettyHttpBinding().toNettyRequest(exchange.getIn(), u.toString(), getConfiguration());
         String actualUri = request.getUri();
         exchange.getIn().setHeader(Exchange.HTTP_URL, actualUri);
         // Need to check if we need to close the connection or not
@@ -71,6 +74,19 @@ public class NettyHttpProducer extends NettyProducer {
             exchange.getIn().removeHeader("host");
         }
 
+        // need to release the request when we are done
+        exchange.addOnCompletion(new SynchronizationAdapter(){
+            @Override
+            public void onDone(Exchange exchange) {
+                if (request instanceof ReferenceCounted) {
+                    if (((ReferenceCounted) request).refCnt() > 0) {
+                        log.debug("Releasing Netty HttpRequest ByteBuf");
+                        ReferenceCountUtil.release(request);
+                    }
+                }
+            }
+        });
+
         return request;
     }
 
@@ -92,23 +108,38 @@ public class NettyHttpProducer extends NettyProducer {
         @Override
         public void done(boolean doneSync) {
             try {
-                NettyHttpMessage nettyMessage = exchange.hasOut() ? exchange.getOut(NettyHttpMessage.class) : exchange.getIn(NettyHttpMessage.class);
-                if (nettyMessage != null) {
-                    FullHttpResponse response = nettyMessage.getHttpResponse();
-                    // Need to retain the ByteBuffer for producer to consumer
-                    if (response != null) {
-                        response.content().retain();
-                        // the actual url is stored on the IN message in the getRequestBody method as its accessed on-demand
-                        String actualUrl = exchange.getIn().getHeader(Exchange.HTTP_URL, String.class);
-                        int code = response.getStatus() != null ? response.getStatus().code() : -1;
-                        log.debug("Http responseCode: {}", code);
-
-                        // if there was a http error code then check if we should throw an exception
-                        boolean ok = NettyHttpHelper.isStatusCodeOk(code, configuration.getOkStatusCodeRange());
-                        if (!ok && getConfiguration().isThrowExceptionOnFailure()) {
-                            // operation failed so populate exception to throw
-                            Exception cause = NettyHttpHelper.populateNettyHttpOperationFailedException(exchange, actualUrl, response, code, getConfiguration().isTransferException());
-                            exchange.setException(cause);
+                // only handle when we are done asynchronous as then the netty producer is done sending, and we have a response
+                if (!doneSync) {
+                    NettyHttpMessage nettyMessage = exchange.hasOut() ? exchange.getOut(NettyHttpMessage.class) : exchange.getIn(NettyHttpMessage.class);
+                    if (nettyMessage != null) {
+                        final FullHttpResponse response = nettyMessage.getHttpResponse();
+                        // Need to retain the ByteBuffer for producer to consumer
+                        if (response != null) {
+                            response.content().retain();
+
+                            // need to release the response when we are done
+                            exchange.addOnCompletion(new SynchronizationAdapter(){
+                                @Override
+                                public void onDone(Exchange exchange) {
+                                    if (response.refCnt() > 0) {
+                                        log.debug("Releasing Netty HttpResonse ByteBuf");
+                                        ReferenceCountUtil.release(response);
+                                    }
+                                }
+                            });
+
+                            // the actual url is stored on the IN message in the getRequestBody method as its accessed on-demand
+                            String actualUrl = exchange.getIn().getHeader(Exchange.HTTP_URL, String.class);
+                            int code = response.getStatus() != null ? response.getStatus().code() : -1;
+                            log.debug("Http responseCode: {}", code);
+
+                            // if there was a http error code then check if we should throw an exception
+                            boolean ok = NettyHttpHelper.isStatusCodeOk(code, configuration.getOkStatusCodeRange());
+                            if (!ok && getConfiguration().isThrowExceptionOnFailure()) {
+                                // operation failed so populate exception to throw
+                                Exception cause = NettyHttpHelper.populateNettyHttpOperationFailedException(exchange, actualUrl, response, code, getConfiguration().isTransferException());
+                                exchange.setException(cause);
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
index f895fac..4c3b799 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorTest.java
@@ -34,7 +34,7 @@ public class NettyHttp500ErrorTest extends BaseNettyTest {
         } catch (CamelExecutionException e) {
             NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
             assertEquals(500, cause.getStatusCode());
-            assertEquals("Camel cannot do this", context.getTypeConverter().convertTo(String.class, cause.getHttpContent().content()));
+            assertEquals("Camel cannot do this", cause.getContentAsString());
         }
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
index 13c7f68..450009b 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
@@ -32,11 +32,10 @@ public class NettyHttp500ErrorThrowExceptionOnServerTest extends BaseNettyTest {
         } catch (CamelExecutionException e) {
             NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
             assertEquals(500, cause.getStatusCode());
-            String trace = context.getTypeConverter().convertTo(String.class, cause.getHttpContent().content());
+            String trace = cause.getContentAsString();
             assertNotNull(trace);
             assertTrue(trace.startsWith("java.lang.IllegalArgumentException: Camel cannot do this"));
             assertEquals("http://localhost:" + getPort() + "/foo", cause.getUri());
-            cause.getHttpContent().content().release();
         }
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
index f19690f..dd17a23 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
@@ -71,9 +71,7 @@ public class NettyHttpHandle404Test extends BaseNettyTest {
                                 // instead as an exception that will get thrown and thus the route breaks
                                 NettyHttpOperationFailedException cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, NettyHttpOperationFailedException.class);
                                 exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, cause.getStatusCode());
-                                exchange.getOut().setBody(cause.getHttpContent().content().toString(Charset.defaultCharset()));
-                                // release as no longer in use
-                                cause.getHttpContent().content().release();
+                                exchange.getOut().setBody(cause.getContentAsString());
                             }
                         })
                         .end();

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
index 0a0fa36..3aef7f3 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
@@ -32,9 +32,8 @@ public class NettyHttpOkStatusCodeTest extends BaseNettyTest {
         } catch (CamelExecutionException e) {
             NettyHttpOperationFailedException cause = assertIsInstanceOf(NettyHttpOperationFailedException.class, e.getCause());
             assertEquals(209, cause.getStatusCode());
-            String body = context.getTypeConverter().convertTo(String.class, cause.getHttpContent().content());
+            String body = cause.getContentAsString();
             assertEquals("Not allowed", body);
-            cause.getHttpContent().content().release();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
index 9b8def5..fb452a0 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
@@ -36,9 +36,8 @@ public class NettyHttpReturnFaultTest extends BaseNettyTest {
         NettyHttpOperationFailedException exception = exchange.getException(NettyHttpOperationFailedException.class);
         assertNotNull(exception);
         assertEquals(500, exception.getStatusCode());
-        String message = context.getTypeConverter().convertTo(String.class, exception.getHttpContent().content());
+        String message = exception.getContentAsString();
         assertEquals("This is a fault", message);
-        exception.getHttpContent().content().release();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/3563f6e6/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index b9a2a17..60db52f 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -94,6 +94,8 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             // signal callback
             callback.done(false);
         }
+
+        super.exceptionCaught(ctx, cause);
     }
 
     @Override


[5/8] camel git commit: CAMEL-9940: ProducerTemplate - Make extract result set part of UoW

Posted by da...@apache.org.
CAMEL-9940: ProducerTemplate - Make extract result set part of UoW


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1cca6b7c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1cca6b7c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1cca6b7c

Branch: refs/heads/master
Commit: 1cca6b7c1d5445c3d75ae6b1d3abbf6c1568eafa
Parents: b4a6ad4
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 09:04:17 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../camel/impl/DefaultProducerTemplate.java     | 40 ++++++++++++---
 .../org/apache/camel/impl/ProducerCache.java    | 52 ++++++++++++++++----
 2 files changed, 76 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1cca6b7c/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index aee4973..a5c8867 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -32,6 +32,7 @@ import org.apache.camel.Message;
 import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.processor.ConvertBodyProcessor;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.CamelContextHelper;
@@ -324,37 +325,44 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
     }
 
     public <T> T requestBody(Object body, Class<T> type) {
-        Object answer = requestBody(body);
+        Exchange exchange = producerCache.send(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
-        Object answer = requestBody(endpoint, body);
+        Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
-        Object answer = requestBody(endpointUri, body);
+        Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
-        Object answer = requestBodyAndHeader(endpoint, body, header, headerValue);
+        Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
-        Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue);
+        Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue), createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
-        Object answer = requestBodyAndHeaders(endpointUri, body, headers);
+        Exchange exchange = producerCache.send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
     public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
-        Object answer = requestBodyAndHeaders(endpoint, body, headers);
+        Exchange exchange = producerCache.send(endpoint, ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type));
+        Object answer = extractResultBody(exchange);
         return camelContext.getTypeConverter().convertTo(type, answer);
     }
 
@@ -436,6 +444,20 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
         };
     }
 
+    protected Processor createBodyAndHeaders(final Object body, final Map<String, Object> headers) {
+        return new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                if (headers != null) {
+                    for (Map.Entry<String, Object> header : headers.entrySet()) {
+                        in.setHeader(header.getKey(), header.getValue());
+                    }
+                }
+                in.setBody(body);
+            }
+        };
+    }
+
     protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
         return new Processor() {
             public void process(Exchange exchange) {
@@ -455,6 +477,10 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
         };
     }
 
+    protected Processor createConvertBodyProcessor(final Class<?> type) {
+        return new ConvertBodyProcessor(type);
+    }
+
     protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
         Endpoint endpoint = camelContext.getEndpoint(endpointUri);
         if (endpoint == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/1cca6b7c/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index c016ea4..ebf641b 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.impl;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.AsyncCallback;
@@ -26,12 +28,12 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.FailedToCreateProducerException;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
 import org.apache.camel.ServicePoolAware;
-import org.apache.camel.processor.UnitOfWorkProducer;
+import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.support.ServiceSupport;
@@ -203,7 +205,7 @@ public class ProducerCache extends ServiceSupport {
      * @param exchange the exchange to send
      */
     public void send(Endpoint endpoint, Exchange exchange) {
-        sendExchange(endpoint, null, null, exchange);
+        sendExchange(endpoint, null, null, null, exchange);
     }
 
     /**
@@ -219,7 +221,7 @@ public class ProducerCache extends ServiceSupport {
      * @return the exchange
      */
     public Exchange send(Endpoint endpoint, Processor processor) {
-        return sendExchange(endpoint, null, processor, null);
+        return sendExchange(endpoint, null, processor, null, null);
     }
 
     /**
@@ -236,7 +238,25 @@ public class ProducerCache extends ServiceSupport {
      * @return the exchange
      */
     public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
-        return sendExchange(endpoint, pattern, processor, null);
+        return sendExchange(endpoint, pattern, processor, null, null);
+    }
+
+    /**
+     * Sends an exchange to an endpoint using a supplied
+     * {@link Processor} to populate the exchange
+     * <p>
+     * This method will <b>not</b> throw an exception. If processing of the given
+     * Exchange failed then the exception is stored on the return Exchange
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param pattern the message {@link ExchangePattern} such as
+     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor the transformer used to populate the new exchange
+     * @param resultProcessor a processor to process the exchange when the send is complete.
+     * @return the exchange
+     */
+    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) {
+        return sendExchange(endpoint, pattern, processor, resultProcessor, null);
     }
 
     /**
@@ -377,7 +397,7 @@ public class ProducerCache extends ServiceSupport {
     }
 
     protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
-                                    final Processor processor, Exchange exchange) {
+                                    final Processor processor, final Processor resultProcessor, Exchange exchange) {
         return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
             public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) {
                 if (exchange == null) {
@@ -408,9 +428,23 @@ public class ProducerCache extends ServiceSupport {
                         watch = new StopWatch();
                         EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
                     }
-                    // ensure we run in an unit of work
-                    Producer target = new UnitOfWorkProducer(producer);
-                    target.process(exchange);
+
+                    // if we have a result processor then wrap in pipeline to execute both of them in sequence
+                    Processor target;
+                    if (resultProcessor != null) {
+                        List<Processor> processors = new ArrayList<Processor>(2);
+                        processors.add(producer);
+                        processors.add(resultProcessor);
+                        target = Pipeline.newInstance(getCamelContext(), processors);
+                    } else {
+                        target = producer;
+                    }
+
+                    // wrap in unit of work
+                    CamelInternalProcessor internal = new CamelInternalProcessor(target);
+                    internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
+
+                    internal.process(exchange);
                 } catch (Throwable e) {
                     // ensure exceptions is caught and set on the exchange
                     exchange.setException(e);


[8/8] camel git commit: CAMEL-9040: Fixed netty leak in some tests

Posted by da...@apache.org.
CAMEL-9040: Fixed netty leak in some tests


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/74a7020f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/74a7020f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/74a7020f

Branch: refs/heads/master
Commit: 74a7020fa36b33ad2e13f67e21af2a7df7a22cd7
Parents: 16c5e34
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 11:45:50 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java   | 1 +
 .../netty4/http/NettyHttpAccessHttpRequestAndResponseBeanTest.java | 2 ++
 .../component/netty4/http/NettyHttpAccessHttpRequestBeanTest.java  | 2 ++
 .../apache/camel/component/netty4/http/NettyHttpHandle404Test.java | 2 ++
 .../camel/component/netty4/http/NettyHttpOkStatusCodeTest.java     | 1 +
 .../camel/component/netty4/http/NettyHttpReturnFaultTest.java      | 1 +
 6 files changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/74a7020f/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
index b5aa37b..13c7f68 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttp500ErrorThrowExceptionOnServerTest.java
@@ -36,6 +36,7 @@ public class NettyHttp500ErrorThrowExceptionOnServerTest extends BaseNettyTest {
             assertNotNull(trace);
             assertTrue(trace.startsWith("java.lang.IllegalArgumentException: Camel cannot do this"));
             assertEquals("http://localhost:" + getPort() + "/foo", cause.getUri());
+            cause.getHttpContent().content().release();
         }
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/74a7020f/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestAndResponseBeanTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestAndResponseBeanTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestAndResponseBeanTest.java
index 6e32b85..54895c2 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestAndResponseBeanTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestAndResponseBeanTest.java
@@ -63,6 +63,8 @@ public class NettyHttpAccessHttpRequestAndResponseBeanTest extends BaseNettyTest
         String in = request.content().toString(Charset.forName("UTF-8"));
         String reply = "Bye " + in;
 
+        request.content().release();
+
         HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                                                             NettyConverter.toByteBuffer(reply.getBytes()));
         

http://git-wip-us.apache.org/repos/asf/camel/blob/74a7020f/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestBeanTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestBeanTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestBeanTest.java
index b1d4dbc..0c37615 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestBeanTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpAccessHttpRequestBeanTest.java
@@ -48,6 +48,8 @@ public class NettyHttpAccessHttpRequestBeanTest extends BaseNettyTest {
 
     public static String myTransformer(FullHttpRequest request) {
         String in = request.content().toString(Charset.forName("UTF-8"));
+        // release as no longer in use
+        request.content().release();
         return "Bye " + in;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/74a7020f/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
index 385ecef..f19690f 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpHandle404Test.java
@@ -72,6 +72,8 @@ public class NettyHttpHandle404Test extends BaseNettyTest {
                                 NettyHttpOperationFailedException cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, NettyHttpOperationFailedException.class);
                                 exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, cause.getStatusCode());
                                 exchange.getOut().setBody(cause.getHttpContent().content().toString(Charset.defaultCharset()));
+                                // release as no longer in use
+                                cause.getHttpContent().content().release();
                             }
                         })
                         .end();

http://git-wip-us.apache.org/repos/asf/camel/blob/74a7020f/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
index c48c1f7..0a0fa36 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpOkStatusCodeTest.java
@@ -34,6 +34,7 @@ public class NettyHttpOkStatusCodeTest extends BaseNettyTest {
             assertEquals(209, cause.getStatusCode());
             String body = context.getTypeConverter().convertTo(String.class, cause.getHttpContent().content());
             assertEquals("Not allowed", body);
+            cause.getHttpContent().content().release();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/74a7020f/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
index f230710..9b8def5 100644
--- a/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
+++ b/components/camel-netty4-http/src/test/java/org/apache/camel/component/netty4/http/NettyHttpReturnFaultTest.java
@@ -38,6 +38,7 @@ public class NettyHttpReturnFaultTest extends BaseNettyTest {
         assertEquals(500, exception.getStatusCode());
         String message = context.getTypeConverter().convertTo(String.class, exception.getHttpContent().content());
         assertEquals("This is a fault", message);
+        exception.getHttpContent().content().release();
     }
 
     @Override


[7/8] camel git commit: CAMEL-9040: Fixed netty leak in http4 producer

Posted by da...@apache.org.
CAMEL-9040: Fixed netty leak in http4 producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9fc3e436
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9fc3e436
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9fc3e436

Branch: refs/heads/master
Commit: 9fc3e436b651bd497ae2415d9606edd7300637bd
Parents: 3563f6e6
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 13:54:56 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../component/netty4/http/DefaultNettyHttpBinding.java   | 11 ++++++-----
 .../component/netty4/http/NettyHttpConfiguration.java    |  8 ++++----
 .../component/netty4/handlers/ClientChannelHandler.java  |  2 --
 3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9fc3e436/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
index f8cf4a3..9f94ea0 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/DefaultNettyHttpBinding.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.netty4.http;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
@@ -49,6 +50,7 @@ import org.apache.camel.StreamCache;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyConverter;
+import org.apache.camel.converter.stream.ByteArrayInputStreamCache;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
@@ -95,7 +97,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
             // keep the body as is, and use type converters
             answer.setBody(request.content());
         } else {
-            // turn the body into stream cached
+            // turn the body into stream cached (on the client/consumer side we can facade the netty stream instead of converting to byte array)
             NettyChannelBufferStreamCache cache = new NettyChannelBufferStreamCache(request.content());
             // add on completion to the cache which is needed for Camel to keep track of the lifecycle of the cache
             exchange.addOnCompletion(new NettyChannelBufferStreamCacheOnCompletion(cache));
@@ -274,13 +276,12 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
             // keep the body as is, and use type converters
             answer.setBody(response.content());
         } else {
-            // stores as byte array as the netty ByteBuf will be freedy when the producer is done, and then we
-            // can no longer access the message body
+            // stores as byte array as the netty ByteBuf will be freed when the producer is done,
+            // and then we can no longer access the message body
             response.retain();
             try {
                 byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, response.content());
-                answer.setBody(bytes);
-                // TODO: use stream caching
+                answer.setBody(new ByteArrayInputStreamCache(new ByteArrayInputStream(bytes)));
             } finally {
                 response.release();
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/9fc3e436/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
index 784d007..8202418 100644
--- a/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
+++ b/components/camel-netty4-http/src/main/java/org/apache/camel/component/netty4/http/NettyHttpConfiguration.java
@@ -55,7 +55,7 @@ public class NettyHttpConfiguration extends NettyConfiguration {
     private boolean matchOnUriPrefix;
     @UriParam
     private boolean bridgeEndpoint;
-    @UriParam(label = "consumer,advanced")
+    @UriParam(label = "advanced")
     private boolean disableStreamCache;
     @UriParam(label = "consumer", defaultValue = "true")
     private boolean send503whenSuspended = true;
@@ -243,14 +243,14 @@ public class NettyHttpConfiguration extends NettyConfiguration {
     }
 
     /**
-     * Determines whether or not the raw input stream from Netty HttpRequest#getContent() is cached or not
-     * (Camel will read the stream into a in light-weight memory based Stream caching) cache.
+     * Determines whether or not the raw input stream from Netty HttpRequest#getContent() or HttpResponset#getContent()
+     * is cached or not (Camel will read the stream into a in light-weight memory based Stream caching) cache.
      * By default Camel will cache the Netty input stream to support reading it multiple times to ensure it Camel
      * can retrieve all data from the stream. However you can set this option to true when you for example need to
      * access the raw stream, such as streaming it directly to a file or other persistent store. Mind that
      * if you enable this option, then you cannot read the Netty stream multiple times out of the box, and you would
      * need manually to reset the reader index on the Netty raw stream. Also Netty will auto-close the Netty stream
-     * when the Netty HTTP server is done processing, which means that if the asynchronous routing engine is in
+     * when the Netty HTTP server/HTTP client is done processing, which means that if the asynchronous routing engine is in
      * use then any asynchronous thread that may continue routing the {@link org.apache.camel.Exchange} may not
      * be able to read the Netty stream, because Netty has closed it.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/9fc3e436/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index 60db52f..b9a2a17 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -94,8 +94,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             // signal callback
             callback.done(false);
         }
-
-        super.exceptionCaught(ctx, cause);
     }
 
     @Override


[4/8] camel git commit: CAMEL-9040: Fixed tests

Posted by da...@apache.org.
CAMEL-9040: Fixed tests


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e7782ebb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e7782ebb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e7782ebb

Branch: refs/heads/master
Commit: e7782ebb729ed7137eeb6eec142bdad4c52c0022
Parents: 7016406
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 10:12:48 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/component/netty4/NettyUdpConnectedSendTest.java   | 1 -
 .../camel/component/netty4/NettyUdpConnectionlessSendTest.java     | 2 --
 2 files changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e7782ebb/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectedSendTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectedSendTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectedSendTest.java
index b38b801..29e9c7a 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectedSendTest.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectedSendTest.java
@@ -117,7 +117,6 @@ public class NettyUdpConnectedSendTest extends BaseNettyTest {
         @Override
         protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
             receivedCount++;
-            assertEquals(SEND_STRING, s);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e7782ebb/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectionlessSendTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectionlessSendTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectionlessSendTest.java
index f4af775..b32ac5c 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectionlessSendTest.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyUdpConnectionlessSendTest.java
@@ -33,7 +33,6 @@ import io.netty.util.CharsetUtil;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.Test;
 
-
 public class NettyUdpConnectionlessSendTest extends BaseNettyTest {
     private static final String SEND_STRING = "***<We all love camel>***";
     private static final int SEND_COUNT = 20;
@@ -110,7 +109,6 @@ public class NettyUdpConnectionlessSendTest extends BaseNettyTest {
         @Override
         protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
             ++receivedCount;
-            assertEquals(SEND_STRING, s);
         }
     }
 }


[3/8] camel git commit: CAMEL-9040: Fixed netty leak in unit test and a NPE

Posted by da...@apache.org.
CAMEL-9040: Fixed netty leak in unit test and a NPE


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/70164066
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/70164066
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/70164066

Branch: refs/heads/master
Commit: 70164066813e49623ba1cab9176e33bbdc74eef2
Parents: 1cca6b7
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 4 10:05:36 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 4 14:07:56 2016 +0200

----------------------------------------------------------------------
 .../org/apache/camel/component/netty4/NettyConverter.java   | 8 ++++++--
 .../org/apache/camel/component/netty4/MyCustomCodec.java    | 2 --
 .../apache/camel/component/netty4/NettyConverterTest.java   | 9 +++++++--
 3 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/70164066/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
index 780e1fe..c3af8db 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
@@ -36,7 +36,6 @@ import io.netty.buffer.ByteBufInputStream;
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
 
-
 /**
  * A set of converter methods for working with Netty types
  *
@@ -56,7 +55,12 @@ public final class NettyConverter {
         }
         byte[] bytes = new byte[buffer.readableBytes()];
         int readerIndex = buffer.readerIndex();
-        buffer.getBytes(readerIndex, bytes);
+        buffer.retain();
+        try {
+            buffer.getBytes(readerIndex, bytes);
+        } finally {
+            buffer.release();
+        }
         return bytes;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/70164066/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
index 363d34d..a20069e 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
@@ -58,8 +58,6 @@ public final class MyCustomCodec {
                 int readerIndex = msg.readerIndex();
                 msg.getBytes(readerIndex, bytes);
                 out.add(bytes);
-            } else {
-                out.add((byte[])null);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/70164066/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyConverterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyConverterTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyConverterTest.java
index bb4f8a1..7023b96 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyConverterTest.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyConverterTest.java
@@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -31,7 +32,7 @@ public class NettyConverterTest extends CamelTestSupport {
     /**
      * Test payload to send.
      */
-    private  static final String PAYLOAD = "Test Message";
+    private static final String PAYLOAD = "Test Message";
 
     private ByteBuf buf;
 
@@ -42,6 +43,11 @@ public class NettyConverterTest extends CamelTestSupport {
         buf.writeBytes(bytes);
     }
 
+    @After
+    public void tearDown() {
+        buf.release();
+    }
+
     @Test
     public void testConversionWithExchange() {
         String result = context.getTypeConverter().convertTo(String.class, new DefaultExchange(context), buf);
@@ -49,7 +55,6 @@ public class NettyConverterTest extends CamelTestSupport {
         assertEquals(PAYLOAD, result);
     }
 
-
     @Test
     public void testConversionWithoutExchange() {
         String result = context.getTypeConverter().convertTo(String.class, buf);