You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by co...@apache.org on 2020/02/03 11:27:07 UTC

[camel] 01/01: CAMEL-14477 - Disable object serialization

This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a commit to branch CAMEL-14477
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 58aff9f9cd4aab3163b8eda8281cb795cb3b59c8
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Mon Feb 3 11:26:26 2020 +0000

    CAMEL-14477 - Disable object serialization
---
 .../camel-netty/src/main/docs/netty-component.adoc |  22 +++-
 .../component/netty/ChannelHandlerFactories.java   |  31 ------
 .../camel/component/netty/NettyConfiguration.java  |  14 +--
 .../camel/component/netty/NettyBacklogTest.java    |   6 --
 .../camel/component/netty/NettyConcurrentTest.java |  25 ++++-
 .../camel/component/netty/NettyOptionTest.java     |   6 --
 .../camel/component/netty/NettyTCPAsyncTest.java   |  17 ++-
 .../camel/component/netty/NettyTCPChainedTest.java |  12 ++-
 .../netty/NettyTCPSyncNotLazyChannelTest.java      |  13 ---
 .../camel/component/netty/NettyTCPSyncTest.java    |  13 ---
 .../netty/NettyTransferExchangeOptionTest.java     |  25 ++++-
 .../camel/component/netty/NettyUDPAsyncTest.java   |   2 +-
 .../component/netty/NettyUDPObjectSyncTest.java    |  50 ---------
 .../component/netty/ObjectSerializationTest.java   | 116 +++++++++++++++++++++
 .../endpoint/dsl/NettyEndpointBuilderFactory.java  |  18 ++--
 15 files changed, 224 insertions(+), 146 deletions(-)

diff --git a/components/camel-netty/src/main/docs/netty-component.adoc b/components/camel-netty/src/main/docs/netty-component.adoc
index 3e540cf..aa9904a 100644
--- a/components/camel-netty/src/main/docs/netty-component.adoc
+++ b/components/camel-netty/src/main/docs/netty-component.adoc
@@ -157,7 +157,7 @@ with the following path and query parameters:
 | *delimiter* (codec) | The delimiter to use for the textline codec. Possible values are LINE and NULL. | LINE | TextLineDelimiter
 | *encoders* (codec) | A list of encoders to be used. You can use a String which have values separated by comma, and have the values be looked up in the Registry. Just remember to prefix the value with # so Camel knows it should lookup. |  | List
 | *encoding* (codec) | The encoding (a charset name) to use for the textline codec. If not provided, Camel will use the JVM default Charset. |  | String
-| *textline* (codec) | Only used for TCP. If no codec is specified, you can use this flag to indicate a text line based codec; if not specified or the value is false, then Object Serialization is assumed over TCP. | false | boolean
+| *textline* (codec) | Only used for TCP. If no codec is specified, you can use this flag to indicate a text line based codec; if not specified or the value is false, then Object Serialization is assumed over TCP - however only Strings are allowed to be serialized by default. | false | boolean
 | *enabledProtocols* (security) | Which protocols to enable when using SSL | TLSv1,TLSv1.1,TLSv1.2 | String
 | *keyStoreFile* (security) | Client side certificate keystore to be used for encryption |  | File
 | *keyStoreFormat* (security) | Keystore format to be used for payload encryption. Defaults to JKS if not set |  | String
@@ -257,7 +257,7 @@ The component supports 80 options, which are listed below.
 | *camel.component.netty.configuration.ssl-handler* | Reference to a class that could be used to return an SSL Handler |  | SslHandler
 | *camel.component.netty.configuration.sync* | Setting to set endpoint as one-way or request-response | true | Boolean
 | *camel.component.netty.configuration.tcp-no-delay* | Setting to improve TCP protocol performance | true | Boolean
-| *camel.component.netty.configuration.textline* | Only used for TCP. If no codec is specified, you can use this flag to indicate a text line based codec; if not specified or the value is false, then Object Serialization is assumed over TCP. | false | Boolean
+| *camel.component.netty.configuration.textline* | Only used for TCP. If no codec is specified, you can use this flag to indicate a text line based codec; if not specified or the value is false, then Object Serialization is assumed over TCP. However note that only Strings are serialized, anything else will only be serialized with a custom encoder/decoder. | false | Boolean
 | *camel.component.netty.configuration.transfer-exchange* | Only used for TCP. You can transfer the exchange over the wire instead of just the body. The following fields are transferred: In body, Out body, fault body, In headers, Out headers, fault headers, exchange properties, exchange exception. This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level. | false | Boolean
 | *camel.component.netty.configuration.trust-store-resource* | Server side certificate keystore to be used for encryption. Is loaded by default from classpath, but you can prefix with "classpath:", "file:", or "http:" to load the resource from different systems. |  | String
 | *camel.component.netty.configuration.udp-byte-array-codec* | For UDP only. If enabled the using byte array codec instead of Java serialization protocol. | false | Boolean
@@ -378,16 +378,28 @@ operations.
 
 === A UDP Netty endpoint using Request-Reply and serialized object payload
 
+Note that Object serialization is not allowed by default, and so a decoder must be configured.
+
 [source,java]
 ----
+@BindToRegistry("decoder")
+public ChannelHandler getDecoder() throws Exception {
+    return new DefaultChannelHandlerFactory() {
+        @Override
+        public ChannelHandler newChannelHandler() {
+            return new DatagramPacketObjectDecoder(ClassResolvers.weakCachingResolver(null));
+        }
+    };
+}
+
 RouteBuilder builder = new RouteBuilder() {
   public void configure() {
-    from("netty:udp://0.0.0.0:5155?sync=true")
+    from("netty:udp://0.0.0.0:5155?sync=true&decoders=#decoder")
       .process(new Processor() {
          public void process(Exchange exchange) throws Exception {
            Poetry poetry = (Poetry) exchange.getIn().getBody();
-           poetry.setPoet("Dr. Sarojini Naidu");
-           exchange.getOut().setBody(poetry);
+           // Process poetry in some way
+           exchange.getOut().setBody("Message received);
          }
        }
     }
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
index 7eff6a6..dbcdadc 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/ChannelHandlerFactories.java
@@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandler;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.bytes.ByteArrayDecoder;
 import io.netty.handler.codec.bytes.ByteArrayEncoder;
-import io.netty.handler.codec.serialization.ClassResolvers;
 import io.netty.handler.codec.string.StringDecoder;
 import io.netty.handler.codec.string.StringEncoder;
 import org.apache.camel.component.netty.codec.DatagramPacketByteArrayDecoder;
@@ -31,13 +30,9 @@ import org.apache.camel.component.netty.codec.DatagramPacketByteArrayEncoder;
 import org.apache.camel.component.netty.codec.DatagramPacketDecoder;
 import org.apache.camel.component.netty.codec.DatagramPacketDelimiterDecoder;
 import org.apache.camel.component.netty.codec.DatagramPacketEncoder;
-import org.apache.camel.component.netty.codec.DatagramPacketObjectDecoder;
-import org.apache.camel.component.netty.codec.DatagramPacketObjectEncoder;
 import org.apache.camel.component.netty.codec.DatagramPacketStringDecoder;
 import org.apache.camel.component.netty.codec.DatagramPacketStringEncoder;
 import org.apache.camel.component.netty.codec.DelimiterBasedFrameDecoder;
-import org.apache.camel.component.netty.codec.ObjectDecoder;
-import org.apache.camel.component.netty.codec.ObjectEncoder;
 
 /**
  * Helper to create commonly used {@link ChannelHandlerFactory} instances.
@@ -64,32 +59,6 @@ public final class ChannelHandlerFactories {
     }
 
 
-    public static ChannelHandlerFactory newObjectDecoder(String protocol) {
-        if ("udp".equalsIgnoreCase(protocol)) {
-            return new DefaultChannelHandlerFactory() {
-                @Override
-                public ChannelHandler newChannelHandler() {
-                    return new DatagramPacketObjectDecoder(ClassResolvers.weakCachingResolver(null));
-                }
-            };
-        } else {
-            return new DefaultChannelHandlerFactory() {
-                @Override
-                public ChannelHandler newChannelHandler() {
-                    return new ObjectDecoder(ClassResolvers.weakCachingResolver(null));
-                }
-            };
-        }
-    }
-
-    public static ChannelHandlerFactory newObjectEncoder(String protocol) {
-        if ("udp".equals(protocol)) {
-            return new ShareableChannelHandlerFactory(new DatagramPacketObjectEncoder());
-        } else {
-            return new ShareableChannelHandlerFactory(new ObjectEncoder());
-        }
-    }
-
     public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final int maxFrameLength, final ByteBuf[] delimiters, String protocol) {
         return newDelimiterBasedFrameDecoder(maxFrameLength, delimiters, true, protocol);
     }
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index 18f88d2..37a9ba1 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -209,7 +209,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
                 if ("udp".equalsIgnoreCase(protocol)) {
                     encoders.add(ChannelHandlerFactories.newDatagramPacketEncoder());
                 }
-                // are we textline or object?
+                // are we textline or byte array
                 if (isTextline()) {
                     Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
                     encoders.add(ChannelHandlerFactories.newStringEncoder(charset, protocol));
@@ -225,11 +225,10 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
                     encoders.add(ChannelHandlerFactories.newByteArrayEncoder(protocol));
                     decoders.add(ChannelHandlerFactories.newByteArrayDecoder(protocol));
                 } else {
-                    // object serializable is then used
-                    encoders.add(ChannelHandlerFactories.newObjectEncoder(protocol));
-                    decoders.add(ChannelHandlerFactories.newObjectDecoder(protocol));
-
-                    LOG.debug("Using object encoders and decoders");
+                    // Fall back to allowing Strings to be serialized only
+                    Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
+                    encoders.add(ChannelHandlerFactories.newStringEncoder(charset, protocol));
+                    decoders.add(ChannelHandlerFactories.newStringDecoder(charset, protocol));
                 }
                 if ("udp".equalsIgnoreCase(protocol)) {
                     decoders.add(ChannelHandlerFactories.newDatagramPacketDecoder());
@@ -300,7 +299,8 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
 
     /**
      * Only used for TCP. If no codec is specified, you can use this flag to indicate a text line based codec;
-     * if not specified or the value is false, then Object Serialization is assumed over TCP.
+     * if not specified or the value is false, then Object Serialization is assumed over TCP - however only Strings
+     * are allowed to be serialized by default.
      */
     public void setTextline(boolean textline) {
         this.textline = textline;
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java
index def966c..7d6cdf2 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java
@@ -30,12 +30,6 @@ public class NettyBacklogTest extends NettyTCPSyncTest {
                 from("netty:tcp://localhost:{{port}}?sync=true&backlog=500")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
-                            if (exchange.getIn().getBody() instanceof Poetry) {
-                                Poetry poetry = (Poetry) exchange.getIn().getBody();
-                                poetry.setPoet("Dr. Sarojini Naidu");
-                                exchange.getOut().setBody(poetry);
-                                return;
-                            }
                             exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");
                         }
                     });
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java
index 69c7e62..c30e841 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java
@@ -26,14 +26,20 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.BindToRegistry;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.codec.ObjectDecoder;
+import org.apache.camel.component.netty.codec.ObjectEncoder;
 import org.apache.camel.util.StopWatch;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.serialization.ClassResolvers;
+
 public class NettyConcurrentTest extends BaseNettyTest {
 
     @Test
@@ -69,7 +75,7 @@ public class NettyConcurrentTest extends BaseNettyTest {
             final int index = i;
             Future<String> out = executor.submit(new Callable<String>() {
                 public String call() throws Exception {
-                    String reply = template.requestBody("netty:tcp://localhost:{{port}}", index, String.class);
+                    String reply = template.requestBody("netty:tcp://localhost:{{port}}?encoders=#encoder&decoders=#decoder", index, String.class);
                     log.debug("Sent {} received {}", index, reply);
                     assertEquals("Bye " + index, reply);
                     return reply;
@@ -93,11 +99,26 @@ public class NettyConcurrentTest extends BaseNettyTest {
         executor.shutdownNow();
     }
 
+    @BindToRegistry("encoder")
+    public ChannelHandler getEncoder() throws Exception {
+        return new ShareableChannelHandlerFactory(new ObjectEncoder());
+    }
+
+    @BindToRegistry("decoder")
+    public ChannelHandler getDecoder() throws Exception {
+        return new DefaultChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+            }
+        };
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("netty:tcp://localhost:{{port}}?sync=true").process(new Processor() {
+                from("netty:tcp://localhost:{{port}}?sync=true&encoders=#encoder&decoders=#decoder").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         String body = exchange.getIn().getBody(String.class);
                         exchange.getOut().setBody("Bye " + body);
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java
index 2f20c66..02410ae 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java
@@ -30,12 +30,6 @@ public class NettyOptionTest extends NettyTCPSyncTest {
                 from("netty:tcp://localhost:{{port}}?sync=true&option.child.keepAlive=false")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
-                            if (exchange.getIn().getBody() instanceof Poetry) {
-                                Poetry poetry = (Poetry) exchange.getIn().getBody();
-                                poetry.setPoet("Dr. Sarojini Naidu");
-                                exchange.getOut().setBody(poetry);
-                                return;
-                            }
                             exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");
                         }
                     });
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java
index 071cd25..c2a794f 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.netty;
 import java.io.FileInputStream;
 import java.io.InputStream;
 
+import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -29,6 +30,8 @@ import org.apache.camel.converter.IOConverter;
 import org.apache.camel.util.IOHelper;
 import org.junit.Test;
 
+import io.netty.channel.ChannelHandler;
+
 public class NettyTCPAsyncTest extends BaseNettyTest {
     @EndpointInject("mock:result")
     protected MockEndpoint resultEndpoint;
@@ -50,11 +53,21 @@ public class NettyTCPAsyncTest extends BaseNettyTest {
         });
     }
 
+    @BindToRegistry("encoder")
+    public ChannelHandler getEncoder() throws Exception {
+        return ChannelHandlerFactories.newByteArrayEncoder("tcp");
+    }
+
+    @BindToRegistry("decoder")
+    public ChannelHandler getDecoder() throws Exception {
+        return ChannelHandlerFactories.newByteArrayDecoder("tcp");
+    }
+
     @Test
     public void testTCPInOnlyWithNettyConsumer() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        sendFile("netty:tcp://localhost:{{port}}?sync=false");
+        sendFile("netty:tcp://localhost:{{port}}?sync=false&encoders=#encoder");
 
         mock.assertIsSatisfied();
     }
@@ -64,7 +77,7 @@ public class NettyTCPAsyncTest extends BaseNettyTest {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("netty:tcp://localhost:{{port}}?sync=false")
+                from("netty:tcp://localhost:{{port}}?sync=false&decoders=#decoder")
                     .to("log:result")
                     .to("mock:result");
             }
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPChainedTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPChainedTest.java
index 20e04ac..e0f52d6 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPChainedTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPChainedTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.netty;
 import java.io.FileInputStream;
 import java.io.InputStream;
 
+import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -30,6 +31,8 @@ import org.apache.camel.util.IOHelper;
 import org.junit.Assert;
 import org.junit.Test;
 
+import io.netty.channel.ChannelHandler;
+
 /**
  * In this test we are checking that same netty endpoint can be safely called twice
  * in single route with reconnect. It requires for processing to be fully async otherwise
@@ -60,6 +63,11 @@ public class NettyTCPChainedTest extends BaseNettyTest {
         Assert.assertFalse(exchange.isFailed());
     }
 
+    @BindToRegistry("encoder")
+    public ChannelHandler getEncoder() throws Exception {
+        return ChannelHandlerFactories.newByteArrayEncoder("tcp");
+    }
+
     @Test
     public void testTCPChainedConnectionFromCallbackThread() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
@@ -74,11 +82,11 @@ public class NettyTCPChainedTest extends BaseNettyTest {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("netty:tcp://localhost:{{port}}?sync=false")
+                from("netty:tcp://localhost:{{port}}?sync=false&encoders=#encoder")
                     .to("log:result")
                     .to("mock:result");
                 from("direct:nettyCall")
-                        .to("netty:tcp://localhost:{{port}}?sync=false&disconnect=true&workerCount=1");
+                        .to("netty:tcp://localhost:{{port}}?sync=false&disconnect=true&workerCount=1&encoders=#encoder");
                 from("direct:chainedCalls")
                         .to("direct:nettyCall")
                         .to("direct:nettyCall");
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java
index 0ac6488..3ec9d26 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncNotLazyChannelTest.java
@@ -31,13 +31,6 @@ public class NettyTCPSyncNotLazyChannelTest extends BaseNettyTest {
         assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
     }
 
-    @Test
-    public void testTCPObjectInOutWithNettyConsumer() throws Exception {
-        Poetry poetry = new Poetry();
-        Poetry response = (Poetry) template.requestBody("netty:tcp://localhost:{{port}}?sync=true&lazyChannelCreation=false", poetry);
-        assertEquals("Dr. Sarojini Naidu", response.getPoet());
-    }
-
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -46,12 +39,6 @@ public class NettyTCPSyncNotLazyChannelTest extends BaseNettyTest {
                 from("netty:tcp://localhost:{{port}}?sync=true")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
-                            if (exchange.getIn().getBody() instanceof Poetry) {
-                                Poetry poetry = (Poetry) exchange.getIn().getBody();
-                                poetry.setPoet("Dr. Sarojini Naidu");
-                                exchange.getOut().setBody(poetry);
-                                return;
-                            }
                             exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");
                         }
                     });
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java
index b04afb21..ea7ce81 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java
@@ -44,13 +44,6 @@ public class NettyTCPSyncTest extends BaseNettyTest {
         assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
     }
 
-    @Test
-    public void testTCPObjectInOutWithNettyConsumer() throws Exception {
-        Poetry poetry = new Poetry();
-        Poetry response = (Poetry) template.requestBody("netty:tcp://localhost:{{port}}?sync=true", poetry);
-        assertEquals("Dr. Sarojini Naidu", response.getPoet());
-    }
-
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -59,12 +52,6 @@ public class NettyTCPSyncTest extends BaseNettyTest {
                 from("netty:tcp://localhost:{{port}}?sync=true")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
-                            if (exchange.getIn().getBody() instanceof Poetry) {
-                                Poetry poetry = (Poetry) exchange.getIn().getBody();
-                                poetry.setPoet("Dr. Sarojini Naidu");
-                                exchange.getOut().setBody(poetry);
-                                return;
-                            }
                             exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");
                         }
                     });
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
index b508b10..e3924ed 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.netty;
 
 import java.nio.charset.Charset;
 
+import org.apache.camel.BindToRegistry;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -25,8 +26,13 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.codec.ObjectDecoder;
+import org.apache.camel.component.netty.codec.ObjectEncoder;
 import org.junit.Test;
 
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.serialization.ClassResolvers;
+
 public class NettyTransferExchangeOptionTest extends BaseNettyTest {
 
     @Test
@@ -41,8 +47,23 @@ public class NettyTransferExchangeOptionTest extends BaseNettyTest {
         assertExchange(exchange, true);
     }
 
+    @BindToRegistry("encoder")
+    public ChannelHandler getEncoder() throws Exception {
+        return new ShareableChannelHandlerFactory(new ObjectEncoder());
+    }
+
+    @BindToRegistry("decoder")
+    public ChannelHandler getDecoder() throws Exception {
+        return new DefaultChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+            }
+        };
+    }
+
     private Exchange sendExchange(boolean setException) throws Exception {
-        Endpoint endpoint = context.getEndpoint("netty:tcp://localhost:{{port}}?transferExchange=true");
+        Endpoint endpoint = context.getEndpoint("netty:tcp://localhost:{{port}}?transferExchange=true&encoders=#encoder&decoders=#decoder");
         Exchange exchange = endpoint.createExchange();
 
         Message message = exchange.getIn();
@@ -93,7 +114,7 @@ public class NettyTransferExchangeOptionTest extends BaseNettyTest {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("netty:tcp://localhost:{{port}}?transferExchange=true").process(new Processor() {
+                from("netty:tcp://localhost:{{port}}?transferExchange=true&encoders=#encoder&decoders=#decoder").process(new Processor() {
                     public void process(Exchange e) throws InterruptedException {
                         assertNotNull(e.getIn().getBody());
                         assertNotNull(e.getIn().getHeaders());
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java
index eae497c..5aea806 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java
@@ -42,7 +42,7 @@ public class NettyUDPAsyncTest extends BaseNettyTest {
         mock.expectedMessageCount(1);
         mock.message(0).body().startsWith("Song Of A Dream".getBytes());
 
-        sendFile("netty:udp://localhost:{{port}}?sync=false");
+        sendFile("netty:udp://localhost:{{port}}?sync=false&udpByteArrayCodec=true");
 
         mock.assertIsSatisfied();
     }
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPObjectSyncTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPObjectSyncTest.java
deleted file mode 100644
index 8f18991..0000000
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPObjectSyncTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.junit.Test;
-
-public class NettyUDPObjectSyncTest extends BaseNettyTest {
-
-    @Test
-    public void testUDPObjectInOutWithNettyConsumer() throws Exception {
-        Poetry poetry = new Poetry();
-        Poetry response = template.requestBody("netty:udp://localhost:{{port}}?sync=true", poetry, Poetry.class);
-        assertEquals("Dr. Sarojini Naidu", response.getPoet());
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("netty:udp://localhost:{{port}}?sync=true")
-                    .process(new Processor() {
-                        public void process(Exchange exchange) throws Exception {
-                            Poetry poetry = (Poetry) exchange.getIn().getBody();
-                            poetry.setPoet("Dr. Sarojini Naidu");
-                            exchange.getOut().setBody(poetry);
-                        }
-                    });
-            }
-        };
-    }
-
-}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ObjectSerializationTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ObjectSerializationTest.java
new file mode 100644
index 0000000..3e3e4ec
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ObjectSerializationTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.codec.ObjectDecoder;
+import org.apache.camel.component.netty.codec.ObjectEncoder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.serialization.ClassResolvers;
+
+/**
+ * Object Serialization is not allowed by default. However it can be enabled by adding specific encoders/decoders.
+ */
+public class ObjectSerializationTest extends BaseNettyTest {
+
+    private static volatile int port2;
+
+    @BeforeClass
+    public static void initPort2() throws Exception {
+        port2 = AvailablePortFinder.getNextAvailable();
+    }
+
+    @Test
+    public void testObjectSerializationFailureByDefault() throws Exception {
+        Date date = new Date();
+        try {
+            template.requestBody("netty:tcp://localhost:{{port}}?sync=true&encoders=#encoder", date, Date.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testObjectSerializationAllowedViaDecoder() throws Exception {
+        Date date = new Date();
+        Date receivedDate = template.requestBody("netty:tcp://localhost:{{port2}}?sync=true&encoders=#encoder&decoders=#decoder", date, Date.class);
+        assertEquals(date, receivedDate);
+    }
+
+    @Override
+    @BindToRegistry("prop")
+    public Properties loadProperties() throws Exception {
+
+        Properties prop = new Properties();
+        prop.setProperty("port", "" + getPort());
+        prop.setProperty("port2", "" + port2);
+
+        return prop;
+    }
+
+    @BindToRegistry("encoder")
+    public ChannelHandler getEncoder() throws Exception {
+        return new ShareableChannelHandlerFactory(new ObjectEncoder());
+    }
+
+    @BindToRegistry("decoder")
+    public ChannelHandler getDecoder() throws Exception {
+        return new DefaultChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+               from("netty:tcp://localhost:{{port}}?sync=true")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            Object obj = exchange.getIn().getBody();
+                            exchange.getOut().setBody(obj);
+                        }
+                    });
+
+                from("netty:tcp://localhost:{{port2}}?sync=true&decoders=#decoder&encoders=#encoder")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            Object obj = exchange.getIn().getBody();
+                            exchange.getOut().setBody(obj);
+                        }
+                    });
+            }
+        };
+    }
+
+}
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NettyEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NettyEndpointBuilderFactory.java
index 62d2d5b..52af006 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NettyEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/NettyEndpointBuilderFactory.java
@@ -543,7 +543,8 @@ public interface NettyEndpointBuilderFactory {
         /**
          * Only used for TCP. If no codec is specified, you can use this flag to
          * indicate a text line based codec; if not specified or the value is
-         * false, then Object Serialization is assumed over TCP.
+         * false, then Object Serialization is assumed over TCP - however only
+         * Strings are allowed to be serialized by default.
          * 
          * The option is a: <code>boolean</code> type.
          * 
@@ -557,7 +558,8 @@ public interface NettyEndpointBuilderFactory {
         /**
          * Only used for TCP. If no codec is specified, you can use this flag to
          * indicate a text line based codec; if not specified or the value is
-         * false, then Object Serialization is assumed over TCP.
+         * false, then Object Serialization is assumed over TCP - however only
+         * Strings are allowed to be serialized by default.
          * 
          * The option will be converted to a <code>boolean</code> type.
          * 
@@ -2041,7 +2043,8 @@ public interface NettyEndpointBuilderFactory {
         /**
          * Only used for TCP. If no codec is specified, you can use this flag to
          * indicate a text line based codec; if not specified or the value is
-         * false, then Object Serialization is assumed over TCP.
+         * false, then Object Serialization is assumed over TCP - however only
+         * Strings are allowed to be serialized by default.
          * 
          * The option is a: <code>boolean</code> type.
          * 
@@ -2055,7 +2058,8 @@ public interface NettyEndpointBuilderFactory {
         /**
          * Only used for TCP. If no codec is specified, you can use this flag to
          * indicate a text line based codec; if not specified or the value is
-         * false, then Object Serialization is assumed over TCP.
+         * false, then Object Serialization is assumed over TCP - however only
+         * Strings are allowed to be serialized by default.
          * 
          * The option will be converted to a <code>boolean</code> type.
          * 
@@ -3377,7 +3381,8 @@ public interface NettyEndpointBuilderFactory {
         /**
          * Only used for TCP. If no codec is specified, you can use this flag to
          * indicate a text line based codec; if not specified or the value is
-         * false, then Object Serialization is assumed over TCP.
+         * false, then Object Serialization is assumed over TCP - however only
+         * Strings are allowed to be serialized by default.
          * 
          * The option is a: <code>boolean</code> type.
          * 
@@ -3391,7 +3396,8 @@ public interface NettyEndpointBuilderFactory {
         /**
          * Only used for TCP. If no codec is specified, you can use this flag to
          * indicate a text line based codec; if not specified or the value is
-         * false, then Object Serialization is assumed over TCP.
+         * false, then Object Serialization is assumed over TCP - however only
+         * Strings are allowed to be serialized by default.
          * 
          * The option will be converted to a <code>boolean</code> type.
          *