You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/07 15:45:55 UTC

[6/6] git commit: CAMEL-6555: camel-netty4 for Netty 4.x based component. Work in progress.

CAMEL-6555: camel-netty4 for Netty 4.x based component. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1bb756cd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1bb756cd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1bb756cd

Branch: refs/heads/master
Commit: 1bb756cd23c31811f8c1278aa8f6a38a2ea0abac
Parents: 20adc69
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 7 15:45:36 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 7 15:45:36 2013 +0200

----------------------------------------------------------------------
 components/camel-netty4/pom.xml                 |  77 +++
 .../netty4/CamelNettyThreadNameDeterminer.java  |  36 ++
 .../netty4/ChannelHandlerFactories.java         |  80 +++
 .../component/netty4/ChannelHandlerFactory.java |  35 ++
 .../component/netty4/ClientPipelineFactory.java |  41 ++
 .../netty4/DefaultClientPipelineFactory.java    | 162 ++++++
 .../netty4/DefaultServerPipelineFactory.java    | 182 +++++++
 .../camel/component/netty4/NettyCamelState.java |  46 ++
 .../netty4/NettyClientBossPoolBuilder.java      |  68 +++
 .../camel/component/netty4/NettyComponent.java  | 144 +++++
 .../component/netty4/NettyConfiguration.java    | 432 +++++++++++++++
 .../camel/component/netty4/NettyConstants.java  |  42 ++
 .../camel/component/netty4/NettyConsumer.java   | 109 ++++
 .../netty4/NettyConsumerExceptionHandler.java   |  66 +++
 .../camel/component/netty4/NettyConverter.java  | 124 +++++
 .../camel/component/netty4/NettyEndpoint.java   | 166 ++++++
 .../camel/component/netty4/NettyHelper.java     | 118 ++++
 .../component/netty4/NettyPayloadHelper.java    |  76 +++
 .../camel/component/netty4/NettyProducer.java   | 535 +++++++++++++++++++
 .../NettyServerBootstrapConfiguration.java      | 450 ++++++++++++++++
 .../netty4/NettyServerBootstrapFactory.java     |  73 +++
 .../netty4/NettyServerBossPoolBuilder.java      |  67 +++
 .../netty4/NettyWorkerPoolBuilder.java          |  80 +++
 .../component/netty4/ServerPipelineFactory.java |  41 ++
 .../netty4/ShareableChannelHandlerFactory.java  |  36 ++
 .../netty4/SharedSingletonObjectPool.java       |  86 +++
 .../SingleTCPNettyServerBootstrapFactory.java   | 177 ++++++
 .../SingleUDPNettyServerBootstrapFactory.java   | 182 +++++++
 .../component/netty4/TextLineDelimiter.java     |  26 +
 .../netty4/handlers/ClientChannelHandler.java   | 211 ++++++++
 .../netty4/handlers/ServerChannelHandler.java   | 206 +++++++
 .../handlers/ServerResponseFutureListener.java  |  77 +++
 .../component/netty4/ssl/SSLEngineFactory.java  | 116 ++++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 +++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../services/org/apache/camel/TypeConverter     |  18 +
 .../services/org/apache/camel/component/netty4  |  17 +
 .../camel-netty4/src/test/data/message1.txt     |   1 +
 .../camel/component/netty4/BaseNettyTest.java   |  95 ++++
 .../netty4/MultipleCodecsSpringTest.java        |  43 ++
 .../component/netty4/MultipleCodecsTest.java    |  86 +++
 .../component/netty4/Netty2978IssueTest.java    | 123 +++++
 .../component/netty4/NettyBacklogTest.java      |  46 ++
 .../NettyComponentWithConfigurationTest.java    |  57 ++
 .../component/netty4/NettyConcurrentTest.java   | 103 ++++
 .../component/netty4/NettyConverterTest.java    |  60 +++
 .../NettyCustomPipelineFactoryAsynchTest.java   | 128 +++++
 .../NettyCustomPipelineFactorySynchTest.java    | 130 +++++
 .../component/netty4/NettyDisconnectTest.java   |  49 ++
 .../component/netty4/NettyFileTcpTest.java      |  50 ++
 .../NettyInOutCloseChannelWhenCompleteTest.java |  50 ++
 .../netty4/NettyInOutFromSedaTest.java          |  56 ++
 .../NettyInOutWithForcedNoResponseTest.java     |  55 ++
 .../netty4/NettyManualEndpointTest.java         |  79 +++
 .../camel/component/netty4/NettyOptionTest.java |  46 ++
 .../netty4/NettyProducerAsyncEndpointTest.java  |  76 +++
 .../netty4/NettyProducerPoolDisabledTest.java   |  50 ++
 .../camel/component/netty4/NettyProxyTest.java  |  61 +++
 .../netty4/NettyRequestTimeoutTest.java         |  84 +++
 .../netty4/NettyReuseConnectionTest.java        |  46 ++
 .../component/netty4/NettySSLClasspathTest.java |  57 ++
 .../netty4/NettySSLClientCertHeadersTest.java   |  74 +++
 .../netty4/NettySSLContextParametersTest.java   |  92 ++++
 .../camel/component/netty4/NettySSLTest.java    |  78 +++
 .../component/netty4/NettySingleCodecTest.java  |  62 +++
 .../component/netty4/NettyTCPAsyncTest.java     |  75 +++
 .../netty4/NettyTCPSyncNotLazyChannelTest.java  |  62 +++
 .../component/netty4/NettyTCPSyncTest.java      |  75 +++
 .../NettyTcpWithInOutUsingPlainSocketTest.java  | 143 +++++
 .../NettyTextlineInOnlyNullDelimiterTest.java   |  48 ++
 .../netty4/NettyTextlineInOnlyTest.java         |  66 +++
 .../NettyTextlineInOutNonBlockingTest.java      |  87 +++
 .../NettyTextlineInOutSynchronousFalseTest.java |  72 +++
 .../NettyTextlineInOutSynchronousTest.java      |  72 +++
 .../netty4/NettyTextlineInOutTest.java          |  50 ++
 .../netty4/NettyTransferExchangeOptionTest.java | 128 +++++
 .../component/netty4/NettyUDPAsyncTest.java     |  74 +++
 .../netty4/NettyUDPLargeMessageInOnlyTest.java  |  68 +++
 .../netty4/NettyUDPObjectSyncTest.java          |  50 ++
 .../component/netty4/NettyUDPSyncTest.java      |  52 ++
 .../NettyUdpWithInOutUsingPlainSocketTest.java  |  80 +++
 ...UseSharedWorkerThreadPoolManyRoutesTest.java |  84 +++
 .../NettyUseSharedWorkerThreadPoolTest.java     | 103 ++++
 .../apache/camel/component/netty4/Poetry.java   |  57 ++
 ...pringNettyUseSharedWorkerThreadPoolTest.java |  51 ++
 .../netty4/UnsharableCodecsConflicts2Test.java  | 123 +++++
 .../netty4/UnsharableCodecsConflictsTest.java   | 129 +++++
 .../src/test/resources/keystore.jks             | Bin 0 -> 1473 bytes
 .../src/test/resources/log4j.properties         |  38 ++
 ...SpringNettyUseSharedWorkerThreadPoolTest.xml |  52 ++
 .../camel/component/netty4/multiple-codecs.xml  |  72 +++
 .../camel-netty4/src/test/resources/test.txt    |  19 +
 92 files changed, 8483 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-netty4/pom.xml b/components/camel-netty4/pom.xml
new file mode 100644
index 0000000..62ac47d
--- /dev/null
+++ b/components/camel-netty4/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+	Licensed to the Apache Software Foundation (ASF) under one or more
+	contributor license agreements. See the NOTICE file distributed with
+	this work for additional information regarding copyright ownership.
+	The ASF licenses this file to You under the Apache License, Version
+	2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+	applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+	CONDITIONS OF ANY KIND, either express or implied. See the License for
+	the specific language governing permissions and limitations under the
+	License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.12-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-netty4</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: Netty</name>
+  <description>Camel Netty 4.x NIO based socket communication component</description>
+
+  <properties>
+    <camel.osgi.export.pkg>
+      org.apache.camel.component.netty4.*
+    </camel.osgi.export.pkg>
+    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=netty4</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <!-- TODO: use netty-all and ${netty-version} when upgrading to Netty 4.x -->
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>${netty3-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+      <version>${commons-pool-version}</version>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- logging -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java
new file mode 100644
index 0000000..9191d4f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.util.concurrent.ThreadHelper;
+import org.jboss.netty.util.ThreadNameDeterminer;
+
+public class CamelNettyThreadNameDeterminer implements ThreadNameDeterminer {
+
+    private final String pattern;
+    private final String name;
+
+    public CamelNettyThreadNameDeterminer(String pattern, String name) {
+        this.pattern = pattern;
+        this.name = name;
+    }
+
+    @Override
+    public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
+        return ThreadHelper.resolveThreadName(pattern, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
new file mode 100644
index 0000000..53e961f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.nio.charset.Charset;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.serialization.ClassResolvers;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+
+/**
+ * Helper to create commonly used {@link ChannelHandlerFactory} instances.
+ */
+public final class ChannelHandlerFactories {
+
+    private ChannelHandlerFactories() {
+    }
+
+    public static ChannelHandlerFactory newStringEncoder(Charset charset) {
+        return new ShareableChannelHandlerFactory(new StringEncoder(charset));
+    }
+
+    public static ChannelHandlerFactory newStringDecoder(Charset charset) {
+        return new ShareableChannelHandlerFactory(new StringDecoder(charset));
+    }
+
+    public static ChannelHandlerFactory newObjectDecoder() {
+        return new ChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+            }
+        };
+    }
+
+    public static ChannelHandlerFactory newObjectEncoder() {
+        return new ShareableChannelHandlerFactory(new ObjectEncoder());
+    }
+
+    public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final int maxFrameLength, final ChannelBuffer[] delimiters) {
+        return new ChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new DelimiterBasedFrameDecoder(maxFrameLength, true, delimiters);
+            }
+        };
+    }
+
+    public static ChannelHandlerFactory newLengthFieldBasedFrameDecoder(final int maxFrameLength, final int lengthFieldOffset,
+                                                                        final int lengthFieldLength, final int lengthAdjustment,
+                                                                        final int initialBytesToStrip) {
+        return new ChannelHandlerFactory() {
+            @Override
+            public ChannelHandler newChannelHandler() {
+                return new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java
new file mode 100644
index 0000000..d05323d
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * Factory for creating new {@link ChannelHandler} used for non shareable
+ * encoders and decoders configured on the Camel {@link NettyComponent}.
+ * <p/>
+ * This is needed as Netty's {@link ChannelHandler} is often not shareable
+ * and therefore a new instance must be created when a handler is being
+ * added to a pipeline.
+ */
+public interface ChannelHandlerFactory extends ChannelHandler {
+
+    /**
+     * Creates a new {@link ChannelHandler} to be used.
+     */
+    ChannelHandler newChannelHandler();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java
new file mode 100644
index 0000000..9921b31
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyProducer}.
+ * <p/>
+ * Implementators must support creating a new instance of this factory which is associated
+ * to the given {@link NettyProducer} using the {@link #createPipelineFactory(NettyProducer)}
+ * method.
+ *
+ * @see ChannelPipelineFactory
+ */
+public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
+
+    /**
+     * Creates a new {@link ClientPipelineFactory} using the given {@link NettyProducer}
+     *
+     * @param producer the associated producers
+     * @return the {@link ClientPipelineFactory} associated to ghe given producer.
+     */
+    public abstract ClientPipelineFactory createPipelineFactory(NettyProducer producer);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
new file mode 100644
index 0000000..e80f96a
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty4.handlers.ClientChannelHandler;
+import org.apache.camel.component.netty4.ssl.SSLEngineFactory;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultClientPipelineFactory extends ClientPipelineFactory  {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
+
+    private final NettyProducer producer;
+    private SSLContext sslContext;
+
+    public DefaultClientPipelineFactory(NettyProducer producer) {
+        this.producer = producer;
+        try {
+            this.sslContext = createSSLContext(producer);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    public ChannelPipeline getPipeline() throws Exception {
+        // create a new pipeline
+        ChannelPipeline channelPipeline = Channels.pipeline();
+
+        SslHandler sslHandler = configureClientSSLOnDemand();
+        if (sslHandler != null) {
+            // must close on SSL exception
+            sslHandler.setCloseOnSSLException(true);
+            LOG.debug("Client SSL handler configured and added to the ChannelPipeline: {}", sslHandler);
+            addToPipeline("ssl", channelPipeline, sslHandler);
+        }
+
+        List<ChannelHandler> decoders = producer.getConfiguration().getDecoders();
+        for (int x = 0; x < decoders.size(); x++) {
+            ChannelHandler decoder = decoders.get(x);
+            if (decoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as it may not be shareable
+                decoder = ((ChannelHandlerFactory) decoder).newChannelHandler();
+            }
+            addToPipeline("decoder-" + x, channelPipeline, decoder);
+        }
+
+        List<ChannelHandler> encoders = producer.getConfiguration().getEncoders();
+        for (int x = 0; x < encoders.size(); x++) {
+            ChannelHandler encoder = encoders.get(x);
+            if (encoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as it may not be shareable
+                encoder = ((ChannelHandlerFactory) encoder).newChannelHandler();
+            }
+            addToPipeline("encoder-" + x, channelPipeline, encoder);
+        }
+
+        // do we use request timeout?
+        if (producer.getConfiguration().getRequestTimeout() > 0) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
+            }
+            ChannelHandler timeout = new ReadTimeoutHandler(NettyComponent.getTimer(), producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
+            addToPipeline("timeout", channelPipeline, timeout);
+        }
+
+        // our handler must be added last
+        addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer));
+
+        LOG.trace("Created ChannelPipeline: {}", channelPipeline);
+        return channelPipeline;
+    }
+
+    private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler) {
+        pipeline.addLast(name, handler);
+    }
+
+    private SSLContext createSSLContext(NettyProducer producer) throws Exception {
+        if (!producer.getConfiguration().isSsl()) {
+            return null;
+        }
+
+        // create ssl context once
+        if (producer.getConfiguration().getSslContextParameters() != null) {
+            SSLContext context = producer.getConfiguration().getSslContextParameters().createSSLContext();
+            return context;
+        }
+
+        return null;
+    }
+
+    private SslHandler configureClientSSLOnDemand() throws Exception {
+        if (!producer.getConfiguration().isSsl()) {
+            return null;
+        }
+
+        if (producer.getConfiguration().getSslHandler() != null) {
+            return producer.getConfiguration().getSslHandler();
+        } else if (sslContext != null) {
+            SSLEngine engine = sslContext.createSSLEngine();
+            engine.setUseClientMode(true);
+            return new SslHandler(engine);
+        } else {
+            if (producer.getConfiguration().getKeyStoreFile() == null && producer.getConfiguration().getKeyStoreResource() == null) {
+                LOG.debug("keystorefile is null");
+            }
+            if (producer.getConfiguration().getTrustStoreFile() == null && producer.getConfiguration().getTrustStoreResource() == null) {
+                LOG.debug("truststorefile is null");
+            }
+            if (producer.getConfiguration().getPassphrase().toCharArray() == null) {
+                LOG.debug("passphrase is null");
+            }
+            SSLEngineFactory sslEngineFactory;
+            if (producer.getConfiguration().getKeyStoreFile() != null || producer.getConfiguration().getTrustStoreFile() != null) {
+                sslEngineFactory = new SSLEngineFactory(
+                    producer.getConfiguration().getKeyStoreFormat(),
+                    producer.getConfiguration().getSecurityProvider(),
+                    producer.getConfiguration().getKeyStoreFile(),
+                    producer.getConfiguration().getTrustStoreFile(),
+                    producer.getConfiguration().getPassphrase().toCharArray());
+            } else {
+                sslEngineFactory = new SSLEngineFactory(producer.getContext().getClassResolver(),
+                        producer.getConfiguration().getKeyStoreFormat(),
+                        producer.getConfiguration().getSecurityProvider(),
+                        producer.getConfiguration().getKeyStoreResource(),
+                        producer.getConfiguration().getTrustStoreResource(),
+                        producer.getConfiguration().getPassphrase().toCharArray());
+            }
+            SSLEngine sslEngine = sslEngineFactory.createClientSSLEngine();
+            return new SslHandler(sslEngine);
+        }
+    }
+
+    @Override
+    public ClientPipelineFactory createPipelineFactory(NettyProducer producer) {
+        return new DefaultClientPipelineFactory(producer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
new file mode 100644
index 0000000..da457c4
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.List;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty4.handlers.ServerChannelHandler;
+import org.apache.camel.component.netty4.ssl.SSLEngineFactory;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultServerPipelineFactory extends ServerPipelineFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultServerPipelineFactory.class);
+
+    private NettyConsumer consumer;
+    private SSLContext sslContext;
+
+    public DefaultServerPipelineFactory(NettyServerBootstrapConfiguration configuration) {
+        this.consumer = null;
+        try {
+            this.sslContext = createSSLContext(configuration);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+
+        if (sslContext != null) {
+            LOG.info("Created SslContext {}", sslContext);
+        }
+    }
+
+    public DefaultServerPipelineFactory(NettyConsumer consumer) {
+        this.consumer = consumer;
+        try {
+            this.sslContext = createSSLContext(consumer.getConfiguration());
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+
+        if (sslContext != null) {
+            LOG.info("Created SslContext {}", sslContext);
+        }
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline channelPipeline = Channels.pipeline();
+
+        SslHandler sslHandler = configureServerSSLOnDemand();
+        if (sslHandler != null) {
+            // must close on SSL exception
+            sslHandler.setCloseOnSSLException(true);
+            LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}", sslHandler);
+            addToPipeline("ssl", channelPipeline, sslHandler);
+        }
+
+        List<ChannelHandler> encoders = consumer.getConfiguration().getEncoders();
+        for (int x = 0; x < encoders.size(); x++) {
+            ChannelHandler encoder = encoders.get(x);
+            if (encoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as it may not be shareable
+                encoder = ((ChannelHandlerFactory) encoder).newChannelHandler();
+            }
+            addToPipeline("encoder-" + x, channelPipeline, encoder);
+        }
+
+        List<ChannelHandler> decoders = consumer.getConfiguration().getDecoders();
+        for (int x = 0; x < decoders.size(); x++) {
+            ChannelHandler decoder = decoders.get(x);
+            if (decoder instanceof ChannelHandlerFactory) {
+                // use the factory to create a new instance of the channel as it may not be shareable
+                decoder = ((ChannelHandlerFactory) decoder).newChannelHandler();
+            }
+            addToPipeline("decoder-" + x, channelPipeline, decoder);
+        }
+
+        if (consumer.getConfiguration().isOrderedThreadPoolExecutor()) {
+            // this must be added just before the ServerChannelHandler
+            // use ordered thread pool, to ensure we process the events in order, and can send back
+            // replies in the expected order. eg this is required by TCP.
+            // and use a Camel thread factory so we have consistent thread namings
+            ExecutionHandler executionHandler = new ExecutionHandler(consumer.getEndpoint().getComponent().getExecutorService());
+            addToPipeline("executionHandler", channelPipeline, executionHandler);
+            LOG.debug("Using OrderedMemoryAwareThreadPoolExecutor with core pool size: {}", consumer.getConfiguration().getMaximumPoolSize());
+        }
+
+        // our handler must be added last
+        addToPipeline("handler", channelPipeline, new ServerChannelHandler(consumer));
+
+        LOG.trace("Created ChannelPipeline: {}", channelPipeline);
+        return channelPipeline;
+    }
+
+    private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler) {
+        pipeline.addLast(name, handler);
+    }
+
+    private SSLContext createSSLContext(NettyServerBootstrapConfiguration configuration) throws Exception {
+        if (!configuration.isSsl()) {
+            return null;
+        }
+
+        // create ssl context once
+        if (configuration.getSslContextParameters() != null) {
+            SSLContext context = configuration.getSslContextParameters().createSSLContext();
+            return context;
+        }
+
+        return null;
+    }
+
+    private SslHandler configureServerSSLOnDemand() throws Exception {
+        if (!consumer.getConfiguration().isSsl()) {
+            return null;
+        }
+
+        if (consumer.getConfiguration().getSslHandler() != null) {
+            return consumer.getConfiguration().getSslHandler();
+        } else if (sslContext != null) {
+            SSLEngine engine = sslContext.createSSLEngine();
+            engine.setUseClientMode(false);
+            engine.setNeedClientAuth(consumer.getConfiguration().isNeedClientAuth());
+            return new SslHandler(engine);
+        } else {
+            if (consumer.getConfiguration().getKeyStoreFile() == null && consumer.getConfiguration().getKeyStoreResource() == null) {
+                LOG.debug("keystorefile is null");
+            }
+            if (consumer.getConfiguration().getTrustStoreFile() == null && consumer.getConfiguration().getTrustStoreResource() == null) {
+                LOG.debug("truststorefile is null");
+            }
+            if (consumer.getConfiguration().getPassphrase().toCharArray() == null) {
+                LOG.debug("passphrase is null");
+            }
+            SSLEngineFactory sslEngineFactory;
+            if (consumer.getConfiguration().getKeyStoreFile() != null || consumer.getConfiguration().getTrustStoreFile() != null) {
+                sslEngineFactory = new SSLEngineFactory(
+                        consumer.getConfiguration().getKeyStoreFormat(),
+                        consumer.getConfiguration().getSecurityProvider(),
+                        consumer.getConfiguration().getKeyStoreFile(),
+                        consumer.getConfiguration().getTrustStoreFile(),
+                        consumer.getConfiguration().getPassphrase().toCharArray());
+            } else {
+                sslEngineFactory = new SSLEngineFactory(consumer.getContext().getClassResolver(),
+                        consumer.getConfiguration().getKeyStoreFormat(),
+                        consumer.getConfiguration().getSecurityProvider(),
+                        consumer.getConfiguration().getKeyStoreResource(),
+                        consumer.getConfiguration().getTrustStoreResource(),
+                        consumer.getConfiguration().getPassphrase().toCharArray());
+            }
+            SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine();
+            sslEngine.setUseClientMode(false);
+            sslEngine.setNeedClientAuth(consumer.getConfiguration().isNeedClientAuth());
+            return new SslHandler(sslEngine);
+        }
+    }
+
+    @Override
+    public ServerPipelineFactory createPipelineFactory(NettyConsumer consumer) {
+        return new DefaultServerPipelineFactory(consumer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java
new file mode 100644
index 0000000..19c4bb1
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * Stores state for {@link NettyProducer} when sending messages.
+ * <p/>
+ * This allows the {@link org.apache.camel.component.netty.handlers.ClientChannelHandler} to access
+ * this state, which is needed so we can get hold of the current {@link Exchange} and the
+ * {@link AsyncCallback} so we can continue routing the message in the Camel routing engine.
+ */
+public final class NettyCamelState {
+
+    private final Exchange exchange;
+    private final AsyncCallback callback;
+
+    public NettyCamelState(AsyncCallback callback, Exchange exchange) {
+        this.callback = callback;
+        this.exchange = exchange;
+    }
+
+    public AsyncCallback getCallback() {
+        return callback;
+    }
+
+    public Exchange getExchange() {
+        return exchange;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java
new file mode 100644
index 0000000..86d31d6
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioClientBossPool;
+import org.jboss.netty.util.HashedWheelTimer;
+
+/**
+ * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools
+ * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyClientBossPoolBuilder {
+
+    private String name = "NettyClientBoss";
+    private String pattern;
+    private int bossCount = 1;
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setPattern(String pattern) {
+        this.pattern = pattern;
+    }
+
+    public void setBossCount(int bossCount) {
+        this.bossCount = bossCount;
+    }
+
+    public NettyClientBossPoolBuilder withName(String name) {
+        setName(name);
+        return this;
+    }
+
+    public NettyClientBossPoolBuilder withPattern(String pattern) {
+        setPattern(pattern);
+        return this;
+    }
+
+    public NettyClientBossPoolBuilder withBossCount(int bossCount) {
+        setBossCount(bossCount);
+        return this;
+    }
+
+    /**
+     * Creates a new boss pool.
+     */
+    BossPool build() {
+        return new NioClientBossPool(Executors.newCachedThreadPool(), bossCount, new HashedWheelTimer(), new CamelNettyThreadNameDeterminer(pattern, name));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java
new file mode 100644
index 0000000..67a533b
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.IntrospectionSupport;
+import org.apache.camel.util.concurrent.CamelThreadFactory;
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+
+public class NettyComponent extends DefaultComponent {
+    // use a shared timer for Netty (see javadoc for HashedWheelTimer)
+    private static volatile Timer timer;
+    private NettyConfiguration configuration;
+    private OrderedMemoryAwareThreadPoolExecutor executorService;
+
+    public NettyComponent() {
+    }
+
+    public NettyComponent(CamelContext context) {
+        super(context);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        NettyConfiguration config;
+        if (configuration != null) {
+            config = configuration.copy();
+        } else {
+            config = new NettyConfiguration();
+        }
+        config = parseConfiguration(config, remaining, parameters);
+
+        // merge any custom bootstrap configuration on the config
+        NettyServerBootstrapConfiguration bootstrapConfiguration = resolveAndRemoveReferenceParameter(parameters, "bootstrapConfiguration", NettyServerBootstrapConfiguration.class);
+        if (bootstrapConfiguration != null) {
+            Map<String, Object> options = new HashMap<String, Object>();
+            if (IntrospectionSupport.getProperties(bootstrapConfiguration, options, null, false)) {
+                IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(), config, options);
+            }
+        }
+
+        // validate config
+        config.validateConfiguration();
+
+        NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, config);
+        nettyEndpoint.setTimer(getTimer());
+        setProperties(nettyEndpoint.getConfiguration(), parameters);
+        return nettyEndpoint;
+    }
+
+    /**
+     * Parses the configuration
+     *
+     * @return the parsed and valid configuration to use
+     */
+    protected NettyConfiguration parseConfiguration(NettyConfiguration configuration, String remaining, Map<String, Object> parameters) throws Exception {
+        configuration.parseURI(new URI(remaining), parameters, this, "tcp", "udp");
+        return configuration;
+    }
+
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public static Timer getTimer() {
+        return timer;
+    }
+
+    public synchronized OrderedMemoryAwareThreadPoolExecutor getExecutorService() {
+        if (executorService == null) {
+            executorService = createExecutorService();
+        }
+        return executorService;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (timer == null) {
+            timer = new HashedWheelTimer();
+        }
+
+        if (configuration == null) {
+            configuration = new NettyConfiguration();
+        }
+        if (configuration.isOrderedThreadPoolExecutor()) {
+            executorService = createExecutorService();
+        }
+
+        super.doStart();
+    }
+
+    protected OrderedMemoryAwareThreadPoolExecutor createExecutorService() {
+        // use ordered thread pool, to ensure we process the events in order, and can send back
+        // replies in the expected order. eg this is required by TCP.
+        // and use a Camel thread factory so we have consistent thread namings
+        // we should use a shared thread pool as recommended by Netty
+        String pattern = getCamelContext().getExecutorServiceManager().getThreadNamePattern();
+        ThreadFactory factory = new CamelThreadFactory(pattern, "NettyOrderedWorker", true);
+        return new OrderedMemoryAwareThreadPoolExecutor(configuration.getMaximumPoolSize(),
+                0L, 0L, 30, TimeUnit.SECONDS, factory);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        timer.stop();
+        timer = null;
+
+        if (executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+
+        super.doStop();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
new file mode 100644
index 0000000..37a3df8
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.IntrospectionSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.util.CharsetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConfiguration extends NettyServerBootstrapConfiguration implements Cloneable {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyConfiguration.class);
+
+    private long requestTimeout;
+    private boolean sync = true;
+    private boolean textline;
+    private TextLineDelimiter delimiter = TextLineDelimiter.LINE;
+    private boolean autoAppendDelimiter = true;
+    private int decoderMaxLineLength = 1024;
+    private String encoding;
+    private List<ChannelHandler> encoders = new ArrayList<ChannelHandler>();
+    private List<ChannelHandler> decoders = new ArrayList<ChannelHandler>();
+    private boolean disconnect;
+    private boolean lazyChannelCreation = true;
+    private boolean transferExchange;
+    private boolean disconnectOnNoReply = true;
+    private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
+    private LoggingLevel serverExceptionCaughtLogLevel = LoggingLevel.WARN;
+    private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG;
+    private boolean allowDefaultCodec = true;
+    private ClientPipelineFactory clientPipelineFactory;
+    private int maximumPoolSize = 16;
+    private boolean orderedThreadPoolExecutor = true;
+    private int producerPoolMaxActive = -1;
+    private int producerPoolMinIdle;
+    private int producerPoolMaxIdle = 100;
+    private long producerPoolMinEvictableIdle = 5 * 60 * 1000L;
+    private boolean producerPoolEnabled = true;
+
+    /**
+     * Returns a copy of this configuration
+     */
+    public NettyConfiguration copy() {
+        try {
+            NettyConfiguration answer = (NettyConfiguration) clone();
+            // make sure the lists is copied in its own instance
+            List<ChannelHandler> encodersCopy = new ArrayList<ChannelHandler>(encoders);
+            answer.setEncoders(encodersCopy);
+            List<ChannelHandler> decodersCopy = new ArrayList<ChannelHandler>(decoders);
+            answer.setDecoders(decodersCopy);
+            return answer;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+
+    public void validateConfiguration() {
+        // validate that the encoders is either shareable or is a handler factory
+        for (ChannelHandler encoder : encoders) {
+            if (encoder instanceof ChannelHandlerFactory) {
+                continue;
+            }
+            if (ObjectHelper.getAnnotation(encoder, ChannelHandler.Sharable.class) != null) {
+                continue;
+            }
+            LOG.warn("The encoder {} is not @Shareable or an ChannelHandlerFactory instance. The encoder cannot safely be used.", encoder);
+        }
+
+        // validate that the decoders is either shareable or is a handler factory
+        for (ChannelHandler decoder : decoders) {
+            if (decoder instanceof ChannelHandlerFactory) {
+                continue;
+            }
+            if (ObjectHelper.getAnnotation(decoder, ChannelHandler.Sharable.class) != null) {
+                continue;
+            }
+            LOG.warn("The decoder {} is not @Shareable or an ChannelHandlerFactory instance. The decoder cannot safely be used.", decoder);
+        }
+        if (sslHandler != null) {
+            boolean factory = sslHandler instanceof ChannelHandlerFactory;
+            boolean shareable = ObjectHelper.getAnnotation(sslHandler, ChannelHandler.Sharable.class) != null;
+            if (!factory && !shareable) {
+                LOG.warn("The sslHandler {} is not @Shareable or an ChannelHandlerFactory instance. The sslHandler cannot safely be used.", sslHandler);
+            }
+        }
+    }
+
+    public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component, String... supportedProtocols) throws Exception {
+        protocol = uri.getScheme();
+
+        boolean found = false;
+        for (String supportedProtocol : supportedProtocols) {
+            if (protocol != null && protocol.equalsIgnoreCase(supportedProtocol)) {
+                found = true;
+                break;
+            }
+        }
+
+        if (!found) {
+            throw new IllegalArgumentException("Unrecognized Netty protocol: " + protocol + " for uri: " + uri);
+        }
+
+        setHost(uri.getHost());
+        setPort(uri.getPort());
+
+        ssl = component.getAndRemoveParameter(parameters, "ssl", boolean.class, false);
+        sslHandler = component.resolveAndRemoveReferenceParameter(parameters, "sslHandler", SslHandler.class, sslHandler);
+        passphrase = component.getAndRemoveParameter(parameters, "passphrase", String.class, passphrase);
+        keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, keyStoreFormat == null ? "JKS" : keyStoreFormat);
+        securityProvider = component.getAndRemoveParameter(parameters, "securityProvider", String.class, securityProvider == null ? "SunX509" : securityProvider);
+        keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, keyStoreFile);
+        trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, trustStoreFile);
+        keyStoreResource = component.getAndRemoveParameter(parameters, "keyStoreResource", String.class, keyStoreResource);
+        trustStoreResource = component.getAndRemoveParameter(parameters, "trustStoreResource", String.class, trustStoreResource);
+        clientPipelineFactory = component.resolveAndRemoveReferenceParameter(parameters, "clientPipelineFactory", ClientPipelineFactory.class, clientPipelineFactory);
+        serverPipelineFactory = component.resolveAndRemoveReferenceParameter(parameters, "serverPipelineFactory", ServerPipelineFactory.class, serverPipelineFactory);
+
+        // set custom encoders and decoders first
+        List<ChannelHandler> referencedEncoders = component.resolveAndRemoveReferenceListParameter(parameters, "encoders", ChannelHandler.class, null);
+        addToHandlersList(encoders, referencedEncoders, ChannelHandler.class);
+        List<ChannelHandler> referencedDecoders = component.resolveAndRemoveReferenceListParameter(parameters, "decoders", ChannelHandler.class, null);
+        addToHandlersList(decoders, referencedDecoders, ChannelHandler.class);
+
+        // then set parameters with the help of the camel context type converters
+        EndpointHelper.setReferenceProperties(component.getCamelContext(), this, parameters);
+        EndpointHelper.setProperties(component.getCamelContext(), this, parameters);
+
+        // additional netty options, we don't want to store an empty map, so set it as null if empty
+        options = IntrospectionSupport.extractProperties(parameters, "option.");
+        if (options !=  null && options.isEmpty()) {
+            options = null;
+        }
+
+        // add default encoders and decoders
+        if (encoders.isEmpty() && decoders.isEmpty()) {
+            if (allowDefaultCodec) {
+                // are we textline or object?
+                if (isTextline()) {
+                    Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
+                    encoders.add(ChannelHandlerFactories.newStringEncoder(charset));
+                    ChannelBuffer[] delimiters = delimiter == TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter();
+                    decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength, delimiters));
+                    decoders.add(ChannelHandlerFactories.newStringDecoder(charset));
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Using textline encoders and decoders with charset: {}, delimiter: {} and decoderMaxLineLength: {}", 
+                                new Object[]{charset, delimiter, decoderMaxLineLength});
+                    }
+                } else {
+                    // object serializable is then used
+                    encoders.add(ChannelHandlerFactories.newObjectEncoder());
+                    decoders.add(ChannelHandlerFactories.newObjectDecoder());
+
+                    LOG.debug("Using object encoders and decoders");
+                }
+            } else {
+                LOG.debug("No encoders and decoders will be used");
+            }
+        } else {
+            LOG.debug("Using configured encoders and/or decoders");
+        }
+    }
+
+    public String getCharsetName() {
+        if (encoding == null) {
+            return null;
+        }
+        if (!Charset.isSupported(encoding)) {
+            throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
+        }
+
+        return Charset.forName(encoding).name();
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
+    public boolean isSync() {
+        return sync;
+    }
+
+    public void setSync(boolean sync) {
+        this.sync = sync;
+    }
+
+    public boolean isTextline() {
+        return textline;
+    }
+
+    public void setTextline(boolean textline) {
+        this.textline = textline;
+    }
+
+    public int getDecoderMaxLineLength() {
+        return decoderMaxLineLength;
+    }
+
+    public void setDecoderMaxLineLength(int decoderMaxLineLength) {
+        this.decoderMaxLineLength = decoderMaxLineLength;
+    }
+
+    public TextLineDelimiter getDelimiter() {
+        return delimiter;
+    }
+
+    public void setDelimiter(TextLineDelimiter delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    public boolean isAutoAppendDelimiter() {
+        return autoAppendDelimiter;
+    }
+
+    public void setAutoAppendDelimiter(boolean autoAppendDelimiter) {
+        this.autoAppendDelimiter = autoAppendDelimiter;
+    }
+
+    public String getEncoding() {
+        return encoding;
+    }
+
+    public void setEncoding(String encoding) {
+        this.encoding = encoding;
+    }
+
+    public List<ChannelHandler> getDecoders() {
+        return decoders;
+    }
+
+    public void setDecoders(List<ChannelHandler> decoders) {
+        this.decoders = decoders;
+    }
+
+    public List<ChannelHandler> getEncoders() {
+        return encoders;
+    }
+
+    public void setEncoders(List<ChannelHandler> encoders) {
+        this.encoders = encoders;
+    }
+
+    public ChannelHandler getEncoder() {
+        return encoders.isEmpty() ? null : encoders.get(0);
+    }
+
+    public void setEncoder(ChannelHandler encoder) {
+        if (!encoders.contains(encoder)) {
+            encoders.add(encoder);
+        }
+    }
+
+    public ChannelHandler getDecoder() {
+        return decoders.isEmpty() ? null : decoders.get(0);
+    }
+
+    public void setDecoder(ChannelHandler decoder) {
+        if (!decoders.contains(decoder)) {
+            decoders.add(decoder);
+        }
+    }
+
+    public boolean isDisconnect() {
+        return disconnect;
+    }
+
+    public void setDisconnect(boolean disconnect) {
+        this.disconnect = disconnect;
+    }
+
+    public boolean isLazyChannelCreation() {
+        return lazyChannelCreation;
+    }
+
+    public void setLazyChannelCreation(boolean lazyChannelCreation) {
+        this.lazyChannelCreation = lazyChannelCreation;
+    }
+
+    public boolean isTransferExchange() {
+        return transferExchange;
+    }
+
+    public void setTransferExchange(boolean transferExchange) {
+        this.transferExchange = transferExchange;
+    }
+
+    public boolean isDisconnectOnNoReply() {
+        return disconnectOnNoReply;
+    }
+
+    public void setDisconnectOnNoReply(boolean disconnectOnNoReply) {
+        this.disconnectOnNoReply = disconnectOnNoReply;
+    }
+
+    public LoggingLevel getNoReplyLogLevel() {
+        return noReplyLogLevel;
+    }
+
+    public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) {
+        this.noReplyLogLevel = noReplyLogLevel;
+    }
+
+    public LoggingLevel getServerExceptionCaughtLogLevel() {
+        return serverExceptionCaughtLogLevel;
+    }
+
+    public void setServerExceptionCaughtLogLevel(LoggingLevel serverExceptionCaughtLogLevel) {
+        this.serverExceptionCaughtLogLevel = serverExceptionCaughtLogLevel;
+    }
+
+    public LoggingLevel getServerClosedChannelExceptionCaughtLogLevel() {
+        return serverClosedChannelExceptionCaughtLogLevel;
+    }
+
+    public void setServerClosedChannelExceptionCaughtLogLevel(LoggingLevel serverClosedChannelExceptionCaughtLogLevel) {
+        this.serverClosedChannelExceptionCaughtLogLevel = serverClosedChannelExceptionCaughtLogLevel;
+    }
+
+    public boolean isAllowDefaultCodec() {
+        return allowDefaultCodec;
+    }
+
+    public void setAllowDefaultCodec(boolean allowDefaultCodec) {
+        this.allowDefaultCodec = allowDefaultCodec;
+    }
+
+    public void setClientPipelineFactory(ClientPipelineFactory clientPipelineFactory) {
+        this.clientPipelineFactory = clientPipelineFactory;
+    }
+
+    public ClientPipelineFactory getClientPipelineFactory() {
+        return clientPipelineFactory;
+    }
+
+    public int getMaximumPoolSize() {
+        return maximumPoolSize;
+    }
+
+    public void setMaximumPoolSize(int maximumPoolSize) {
+        this.maximumPoolSize = maximumPoolSize;
+    }
+
+    public boolean isOrderedThreadPoolExecutor() {
+        return orderedThreadPoolExecutor;
+    }
+
+    public void setOrderedThreadPoolExecutor(boolean orderedThreadPoolExecutor) {
+        this.orderedThreadPoolExecutor = orderedThreadPoolExecutor;
+    }
+
+    public int getProducerPoolMaxActive() {
+        return producerPoolMaxActive;
+    }
+
+    public void setProducerPoolMaxActive(int producerPoolMaxActive) {
+        this.producerPoolMaxActive = producerPoolMaxActive;
+    }
+
+    public int getProducerPoolMinIdle() {
+        return producerPoolMinIdle;
+    }
+
+    public void setProducerPoolMinIdle(int producerPoolMinIdle) {
+        this.producerPoolMinIdle = producerPoolMinIdle;
+    }
+
+    public int getProducerPoolMaxIdle() {
+        return producerPoolMaxIdle;
+    }
+
+    public void setProducerPoolMaxIdle(int producerPoolMaxIdle) {
+        this.producerPoolMaxIdle = producerPoolMaxIdle;
+    }
+
+    public long getProducerPoolMinEvictableIdle() {
+        return producerPoolMinEvictableIdle;
+    }
+
+    public void setProducerPoolMinEvictableIdle(long producerPoolMinEvictableIdle) {
+        this.producerPoolMinEvictableIdle = producerPoolMinEvictableIdle;
+    }
+
+    public boolean isProducerPoolEnabled() {
+        return producerPoolEnabled;
+    }
+
+    public void setProducerPoolEnabled(boolean producerPoolEnabled) {
+        this.producerPoolEnabled = producerPoolEnabled;
+    }
+
+    private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
+        if (handlers != null) {
+            for (T handler : handlers) {
+                if (handlerType.isInstance(handler)) {
+                    configured.add(handler);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
new file mode 100644
index 0000000..e381e61
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+/**
+ * Netty constants
+ *
+ * @version 
+ */
+public final class NettyConstants {
+
+    public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete";
+    public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext";
+    public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
+    public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress";
+    public static final String NETTY_LOCAL_ADDRESS = "CamelNettyLocalAddress";
+    public static final String NETTY_SSL_SESSION = "CamelNettySSLSession";
+    public static final String NETTY_SSL_CLIENT_CERT_SUBJECT_NAME = "CamelNettySSLClientCertSubjectName";
+    public static final String NETTY_SSL_CLIENT_CERT_ISSUER_NAME = "CamelNettySSLClientCertIssuerName";
+    public static final String NETTY_SSL_CLIENT_CERT_SERIAL_NO = "CamelNettySSLClientCertSerialNumber";
+    public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore";
+    public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter";
+
+    private NettyConstants() {
+        // Utility class
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java
new file mode 100644
index 0000000..902e4d9
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConsumer extends DefaultConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
+    private CamelContext context;
+    private NettyConfiguration configuration;
+    private NettyServerBootstrapFactory nettyServerBootstrapFactory;
+
+    public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) {
+        super(nettyEndpoint, processor);
+        this.context = this.getEndpoint().getCamelContext();
+        this.configuration = configuration;
+        setNettyServerBootstrapFactory(configuration.getNettyServerBootstrapFactory());
+        setExceptionHandler(new NettyConsumerExceptionHandler(this));
+    }
+
+    @Override
+    public NettyEndpoint getEndpoint() {
+        return (NettyEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        LOG.debug("Netty consumer binding to: {}", configuration.getAddress());
+
+        if (nettyServerBootstrapFactory == null) {
+            // setup pipeline factory
+            ServerPipelineFactory pipelineFactory;
+            ServerPipelineFactory factory = configuration.getServerPipelineFactory();
+            if (factory != null) {
+                pipelineFactory = factory.createPipelineFactory(this);
+            } else {
+                pipelineFactory = new DefaultServerPipelineFactory(this);
+            }
+
+            if (isTcp()) {
+                nettyServerBootstrapFactory = new SingleTCPNettyServerBootstrapFactory();
+            } else {
+                nettyServerBootstrapFactory = new SingleUDPNettyServerBootstrapFactory();
+            }
+            nettyServerBootstrapFactory.init(context, configuration, pipelineFactory);
+        }
+
+        ServiceHelper.startServices(nettyServerBootstrapFactory);
+
+        LOG.info("Netty consumer bound to: " + configuration.getAddress());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        LOG.debug("Netty consumer unbinding from: {}", configuration.getAddress());
+
+        ServiceHelper.stopService(nettyServerBootstrapFactory);
+
+        LOG.info("Netty consumer unbound from: " + configuration.getAddress());
+
+        super.doStop();
+    }
+
+    public CamelContext getContext() {
+        return context;
+    }
+
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public NettyServerBootstrapFactory getNettyServerBootstrapFactory() {
+        return nettyServerBootstrapFactory;
+    }
+
+    public void setNettyServerBootstrapFactory(NettyServerBootstrapFactory nettyServerBootstrapFactory) {
+        this.nettyServerBootstrapFactory = nettyServerBootstrapFactory;
+    }
+
+    protected boolean isTcp() {
+        return configuration.getProtocol().equalsIgnoreCase("tcp");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java
new file mode 100644
index 0000000..75b1aad
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConsumerExceptionHandler implements ExceptionHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
+    private final CamelLogger logger;
+    private final LoggingLevel closedLoggingLevel;
+
+    public NettyConsumerExceptionHandler(NettyConsumer consumer) {
+        this.logger = new CamelLogger(LOG, consumer.getConfiguration().getServerExceptionCaughtLogLevel());
+        this.closedLoggingLevel = consumer.getConfiguration().getServerClosedChannelExceptionCaughtLogLevel();
+    }
+
+    @Override
+    public void handleException(Throwable exception) {
+        handleException(null, null, exception);
+    }
+
+    @Override
+    public void handleException(String message, Throwable exception) {
+        handleException(message, null, exception);
+    }
+
+    @Override
+    public void handleException(String message, Exchange exchange, Throwable exception) {
+        try {
+            String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception);
+            boolean closed = ObjectHelper.getException(ClosedChannelException.class, exception) != null;
+            if (closed) {
+                logger.log(msg, exception, closedLoggingLevel);
+            } else {
+                logger.log(msg, exception);
+            }
+        } catch (Throwable e) {
+            // the logging exception handler must not cause new exceptions to occur
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
new file mode 100644
index 0000000..aa441d3
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.sax.SAXSource;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamSource;
+
+import org.w3c.dom.Document;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A set of converter methods for working with Netty types
+ *
+ * @version 
+ */
+@Converter
+public final class NettyConverter {
+
+    private NettyConverter() {
+        //Utility Class
+    }
+
+    @Converter
+    public static byte[] toByteArray(ChannelBuffer buffer, Exchange exchange) {
+        return buffer.array();
+    }
+
+    @Converter
+    public static String toString(ChannelBuffer buffer, Exchange exchange) throws UnsupportedEncodingException {
+        byte[] bytes = toByteArray(buffer, exchange);
+        // use type converter as it can handle encoding set on the Exchange
+        if (exchange != null) {
+            return exchange.getContext().getTypeConverter().convertTo(String.class, exchange, bytes);
+        }
+        return new String(bytes, "UTF-8");
+    }
+
+    @Converter
+    public static InputStream toInputStream(ChannelBuffer buffer, Exchange exchange) {
+        return new ChannelBufferInputStream(buffer);
+    }
+
+    @Converter
+    public static ObjectInput toObjectInput(ChannelBuffer buffer, Exchange exchange) throws IOException {
+        InputStream is = toInputStream(buffer, exchange);
+        return new ObjectInputStream(is);
+    }
+
+    @Converter
+    public static ChannelBuffer toByteBuffer(byte[] bytes, Exchange exchange) {
+        ChannelBuffer buf = ChannelBuffers.dynamicBuffer(bytes.length);
+        buf.writeBytes(bytes);
+        return buf;
+    }
+
+    @Converter
+    public static ChannelBuffer toByteBuffer(String s, Exchange exchange) {
+        byte[] bytes;
+        if (exchange != null) {
+            // use type converter as it can handle encoding set on the Exchange
+            bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, s);
+        } else {
+            bytes = s.getBytes();
+        }
+        return toByteBuffer(bytes, exchange);
+    }
+
+    @Converter
+    public static Document toDocument(ChannelBuffer buffer, Exchange exchange) {
+        InputStream is = toInputStream(buffer, exchange);
+        return exchange.getContext().getTypeConverter().convertTo(Document.class, exchange, is);
+    }
+
+    @Converter
+    public static DOMSource toDOMSource(ChannelBuffer buffer, Exchange exchange) {
+        InputStream is = toInputStream(buffer, exchange);
+        return exchange.getContext().getTypeConverter().convertTo(DOMSource.class, exchange, is);
+    }
+
+    @Converter
+    public static SAXSource toSAXSource(ChannelBuffer buffer, Exchange exchange) {
+        InputStream is = toInputStream(buffer, exchange);
+        return exchange.getContext().getTypeConverter().convertTo(SAXSource.class, exchange, is);
+    }
+
+    @Converter
+    public static StreamSource toStreamSource(ChannelBuffer buffer, Exchange exchange) {
+        InputStream is = toInputStream(buffer, exchange);
+        return exchange.getContext().getTypeConverter().convertTo(StreamSource.class, exchange, is);
+    }
+
+    @Converter
+    public static StAXSource toStAXSource(ChannelBuffer buffer, Exchange exchange) {
+        InputStream is = toInputStream(buffer, exchange);
+        return exchange.getContext().getTypeConverter().convertTo(StAXSource.class, exchange, is);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
new file mode 100644
index 0000000..3bc9e90
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.math.BigInteger;
+import java.security.Principal;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.cert.X509Certificate;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.util.Timer;
+
+public class NettyEndpoint extends DefaultEndpoint {
+    private NettyConfiguration configuration;
+    private Timer timer;
+
+    public NettyEndpoint(String endpointUri, NettyComponent component, NettyConfiguration configuration) {
+        super(endpointUri, component);
+        this.configuration = configuration;
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        Consumer answer = new NettyConsumer(this, processor, configuration);
+        configureConsumer(answer);
+        return answer;
+    }
+
+    public Producer createProducer() throws Exception {
+        Producer answer = new NettyProducer(this, configuration);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(answer);
+        } else {
+            return answer;
+        }
+    }
+
+    public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
+        Exchange exchange = createExchange();
+        updateMessageHeader(exchange.getIn(), ctx, messageEvent);
+        NettyPayloadHelper.setIn(exchange, messageEvent.getMessage());
+        return exchange;
+    }
+    
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public NettyComponent getComponent() {
+        return (NettyComponent) super.getComponent();
+    }
+
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public void setTimer(Timer timer) {
+        this.timer = timer;
+    }
+
+    public Timer getTimer() {
+        return timer;
+    }
+
+    @Override
+    protected String createEndpointUri() {
+        ObjectHelper.notNull(configuration, "configuration");
+        return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(timer, "timer");
+    }
+    
+    protected SSLSession getSSLSession(ChannelHandlerContext ctx) {
+        final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
+        SSLSession sslSession = null;
+        if (sslHandler != null) {
+            sslSession = sslHandler.getEngine().getSession();
+        } 
+        return sslSession;
+    }
+
+    protected void updateMessageHeader(Message in, ChannelHandlerContext ctx, MessageEvent messageEvent) {
+        in.setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
+        in.setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
+        in.setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, messageEvent.getRemoteAddress());
+        in.setHeader(NettyConstants.NETTY_LOCAL_ADDRESS, messageEvent.getChannel().getLocalAddress());
+
+        if (configuration.isSsl()) {
+            // setup the SslSession header
+            SSLSession sslSession = getSSLSession(ctx);
+            in.setHeader(NettyConstants.NETTY_SSL_SESSION, sslSession);
+
+            // enrich headers with details from the client certificate if option is enabled
+            if (configuration.isSslClientCertHeaders()) {
+                enrichWithClientCertInformation(sslSession, in);
+            }
+        }
+    }
+
+    /**
+     * Enriches the message with client certificate details such as subject name, serial number etc.
+     * <p/>
+     * If the certificate is unverified then the headers is not enriched.
+     *
+     * @param sslSession  the SSL session
+     * @param message     the message to enrich
+     */
+    protected void enrichWithClientCertInformation(SSLSession sslSession, Message message) {
+        try {
+            X509Certificate[] certificates = sslSession.getPeerCertificateChain();
+            if (certificates != null && certificates.length > 0) {
+                X509Certificate cert = certificates[0];
+
+                Principal subject = cert.getSubjectDN();
+                if (subject != null) {
+                    message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_SUBJECT_NAME, subject.getName());
+                }
+                Principal issuer = cert.getIssuerDN();
+                if (issuer != null) {
+                    message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_ISSUER_NAME, issuer.getName());
+                }
+                BigInteger serial = cert.getSerialNumber();
+                if (serial != null) {
+                    message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_SERIAL_NO, serial.toString());
+                }
+                message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_NOT_BEFORE, cert.getNotBefore());
+                message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_NOT_AFTER, cert.getNotAfter());
+            }
+        } catch (SSLPeerUnverifiedException e) {
+            // ignore
+        }
+    }
+
+}
\ No newline at end of file