You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/07 15:45:55 UTC
[6/6] git commit: CAMEL-6555: camel-netty4 for Netty 4.x based
component. Work in progress.
CAMEL-6555: camel-netty4 for Netty 4.x based component. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1bb756cd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1bb756cd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1bb756cd
Branch: refs/heads/master
Commit: 1bb756cd23c31811f8c1278aa8f6a38a2ea0abac
Parents: 20adc69
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 7 15:45:36 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Aug 7 15:45:36 2013 +0200
----------------------------------------------------------------------
components/camel-netty4/pom.xml | 77 +++
.../netty4/CamelNettyThreadNameDeterminer.java | 36 ++
.../netty4/ChannelHandlerFactories.java | 80 +++
.../component/netty4/ChannelHandlerFactory.java | 35 ++
.../component/netty4/ClientPipelineFactory.java | 41 ++
.../netty4/DefaultClientPipelineFactory.java | 162 ++++++
.../netty4/DefaultServerPipelineFactory.java | 182 +++++++
.../camel/component/netty4/NettyCamelState.java | 46 ++
.../netty4/NettyClientBossPoolBuilder.java | 68 +++
.../camel/component/netty4/NettyComponent.java | 144 +++++
.../component/netty4/NettyConfiguration.java | 432 +++++++++++++++
.../camel/component/netty4/NettyConstants.java | 42 ++
.../camel/component/netty4/NettyConsumer.java | 109 ++++
.../netty4/NettyConsumerExceptionHandler.java | 66 +++
.../camel/component/netty4/NettyConverter.java | 124 +++++
.../camel/component/netty4/NettyEndpoint.java | 166 ++++++
.../camel/component/netty4/NettyHelper.java | 118 ++++
.../component/netty4/NettyPayloadHelper.java | 76 +++
.../camel/component/netty4/NettyProducer.java | 535 +++++++++++++++++++
.../NettyServerBootstrapConfiguration.java | 450 ++++++++++++++++
.../netty4/NettyServerBootstrapFactory.java | 73 +++
.../netty4/NettyServerBossPoolBuilder.java | 67 +++
.../netty4/NettyWorkerPoolBuilder.java | 80 +++
.../component/netty4/ServerPipelineFactory.java | 41 ++
.../netty4/ShareableChannelHandlerFactory.java | 36 ++
.../netty4/SharedSingletonObjectPool.java | 86 +++
.../SingleTCPNettyServerBootstrapFactory.java | 177 ++++++
.../SingleUDPNettyServerBootstrapFactory.java | 182 +++++++
.../component/netty4/TextLineDelimiter.java | 26 +
.../netty4/handlers/ClientChannelHandler.java | 211 ++++++++
.../netty4/handlers/ServerChannelHandler.java | 206 +++++++
.../handlers/ServerResponseFutureListener.java | 77 +++
.../component/netty4/ssl/SSLEngineFactory.java | 116 ++++
.../src/main/resources/META-INF/LICENSE.txt | 203 +++++++
.../src/main/resources/META-INF/NOTICE.txt | 11 +
.../services/org/apache/camel/TypeConverter | 18 +
.../services/org/apache/camel/component/netty4 | 17 +
.../camel-netty4/src/test/data/message1.txt | 1 +
.../camel/component/netty4/BaseNettyTest.java | 95 ++++
.../netty4/MultipleCodecsSpringTest.java | 43 ++
.../component/netty4/MultipleCodecsTest.java | 86 +++
.../component/netty4/Netty2978IssueTest.java | 123 +++++
.../component/netty4/NettyBacklogTest.java | 46 ++
.../NettyComponentWithConfigurationTest.java | 57 ++
.../component/netty4/NettyConcurrentTest.java | 103 ++++
.../component/netty4/NettyConverterTest.java | 60 +++
.../NettyCustomPipelineFactoryAsynchTest.java | 128 +++++
.../NettyCustomPipelineFactorySynchTest.java | 130 +++++
.../component/netty4/NettyDisconnectTest.java | 49 ++
.../component/netty4/NettyFileTcpTest.java | 50 ++
.../NettyInOutCloseChannelWhenCompleteTest.java | 50 ++
.../netty4/NettyInOutFromSedaTest.java | 56 ++
.../NettyInOutWithForcedNoResponseTest.java | 55 ++
.../netty4/NettyManualEndpointTest.java | 79 +++
.../camel/component/netty4/NettyOptionTest.java | 46 ++
.../netty4/NettyProducerAsyncEndpointTest.java | 76 +++
.../netty4/NettyProducerPoolDisabledTest.java | 50 ++
.../camel/component/netty4/NettyProxyTest.java | 61 +++
.../netty4/NettyRequestTimeoutTest.java | 84 +++
.../netty4/NettyReuseConnectionTest.java | 46 ++
.../component/netty4/NettySSLClasspathTest.java | 57 ++
.../netty4/NettySSLClientCertHeadersTest.java | 74 +++
.../netty4/NettySSLContextParametersTest.java | 92 ++++
.../camel/component/netty4/NettySSLTest.java | 78 +++
.../component/netty4/NettySingleCodecTest.java | 62 +++
.../component/netty4/NettyTCPAsyncTest.java | 75 +++
.../netty4/NettyTCPSyncNotLazyChannelTest.java | 62 +++
.../component/netty4/NettyTCPSyncTest.java | 75 +++
.../NettyTcpWithInOutUsingPlainSocketTest.java | 143 +++++
.../NettyTextlineInOnlyNullDelimiterTest.java | 48 ++
.../netty4/NettyTextlineInOnlyTest.java | 66 +++
.../NettyTextlineInOutNonBlockingTest.java | 87 +++
.../NettyTextlineInOutSynchronousFalseTest.java | 72 +++
.../NettyTextlineInOutSynchronousTest.java | 72 +++
.../netty4/NettyTextlineInOutTest.java | 50 ++
.../netty4/NettyTransferExchangeOptionTest.java | 128 +++++
.../component/netty4/NettyUDPAsyncTest.java | 74 +++
.../netty4/NettyUDPLargeMessageInOnlyTest.java | 68 +++
.../netty4/NettyUDPObjectSyncTest.java | 50 ++
.../component/netty4/NettyUDPSyncTest.java | 52 ++
.../NettyUdpWithInOutUsingPlainSocketTest.java | 80 +++
...UseSharedWorkerThreadPoolManyRoutesTest.java | 84 +++
.../NettyUseSharedWorkerThreadPoolTest.java | 103 ++++
.../apache/camel/component/netty4/Poetry.java | 57 ++
...pringNettyUseSharedWorkerThreadPoolTest.java | 51 ++
.../netty4/UnsharableCodecsConflicts2Test.java | 123 +++++
.../netty4/UnsharableCodecsConflictsTest.java | 129 +++++
.../src/test/resources/keystore.jks | Bin 0 -> 1473 bytes
.../src/test/resources/log4j.properties | 38 ++
...SpringNettyUseSharedWorkerThreadPoolTest.xml | 52 ++
.../camel/component/netty4/multiple-codecs.xml | 72 +++
.../camel-netty4/src/test/resources/test.txt | 19 +
92 files changed, 8483 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-netty4/pom.xml b/components/camel-netty4/pom.xml
new file mode 100644
index 0000000..62ac47d
--- /dev/null
+++ b/components/camel-netty4/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version
+ 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the
+ License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.12-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-netty4</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: Netty</name>
+ <description>Camel Netty 4.x NIO based socket communication component</description>
+
+ <properties>
+ <camel.osgi.export.pkg>
+ org.apache.camel.component.netty4.*
+ </camel.osgi.export.pkg>
+ <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=netty4</camel.osgi.export.service>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+ <!-- TODO: use netty-all and ${netty-version} when upgrading to Netty 4.x -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>${netty3-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>${commons-pool-version}</version>
+ </dependency>
+
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java
new file mode 100644
index 0000000..9191d4f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/CamelNettyThreadNameDeterminer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.util.concurrent.ThreadHelper;
+import org.jboss.netty.util.ThreadNameDeterminer;
+
+public class CamelNettyThreadNameDeterminer implements ThreadNameDeterminer {
+
+ private final String pattern;
+ private final String name;
+
+ public CamelNettyThreadNameDeterminer(String pattern, String name) {
+ this.pattern = pattern;
+ this.name = name;
+ }
+
+ @Override
+ public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
+ return ThreadHelper.resolveThreadName(pattern, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
new file mode 100644
index 0000000..53e961f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.nio.charset.Charset;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.serialization.ClassResolvers;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+
+/**
+ * Helper to create commonly used {@link ChannelHandlerFactory} instances.
+ */
+public final class ChannelHandlerFactories {
+
+ private ChannelHandlerFactories() {
+ }
+
+ public static ChannelHandlerFactory newStringEncoder(Charset charset) {
+ return new ShareableChannelHandlerFactory(new StringEncoder(charset));
+ }
+
+ public static ChannelHandlerFactory newStringDecoder(Charset charset) {
+ return new ShareableChannelHandlerFactory(new StringDecoder(charset));
+ }
+
+ public static ChannelHandlerFactory newObjectDecoder() {
+ return new ChannelHandlerFactory() {
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return new ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+ }
+ };
+ }
+
+ public static ChannelHandlerFactory newObjectEncoder() {
+ return new ShareableChannelHandlerFactory(new ObjectEncoder());
+ }
+
+ public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final int maxFrameLength, final ChannelBuffer[] delimiters) {
+ return new ChannelHandlerFactory() {
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return new DelimiterBasedFrameDecoder(maxFrameLength, true, delimiters);
+ }
+ };
+ }
+
+ public static ChannelHandlerFactory newLengthFieldBasedFrameDecoder(final int maxFrameLength, final int lengthFieldOffset,
+ final int lengthFieldLength, final int lengthAdjustment,
+ final int initialBytesToStrip) {
+ return new ChannelHandlerFactory() {
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return new LengthFieldBasedFrameDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java
new file mode 100644
index 0000000..d05323d
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * Factory for creating new {@link ChannelHandler} used for non shareable
+ * encoders and decoders configured on the Camel {@link NettyComponent}.
+ * <p/>
+ * This is needed as Netty's {@link ChannelHandler} is often not shareable
+ * and therefore a new instance must be created when a handler is being
+ * added to a pipeline.
+ */
+public interface ChannelHandlerFactory extends ChannelHandler {
+
+ /**
+ * Creates a new {@link ChannelHandler} to be used.
+ */
+ ChannelHandler newChannelHandler();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java
new file mode 100644
index 0000000..9921b31
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientPipelineFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyProducer}.
+ * <p/>
+ * Implementators must support creating a new instance of this factory which is associated
+ * to the given {@link NettyProducer} using the {@link #createPipelineFactory(NettyProducer)}
+ * method.
+ *
+ * @see ChannelPipelineFactory
+ */
+public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
+
+ /**
+ * Creates a new {@link ClientPipelineFactory} using the given {@link NettyProducer}
+ *
+ * @param producer the associated producers
+ * @return the {@link ClientPipelineFactory} associated to ghe given producer.
+ */
+ public abstract ClientPipelineFactory createPipelineFactory(NettyProducer producer);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
new file mode 100644
index 0000000..e80f96a
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultClientPipelineFactory.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty4.handlers.ClientChannelHandler;
+import org.apache.camel.component.netty4.ssl.SSLEngineFactory;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultClientPipelineFactory extends ClientPipelineFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
+
+ private final NettyProducer producer;
+ private SSLContext sslContext;
+
+ public DefaultClientPipelineFactory(NettyProducer producer) {
+ this.producer = producer;
+ try {
+ this.sslContext = createSSLContext(producer);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // create a new pipeline
+ ChannelPipeline channelPipeline = Channels.pipeline();
+
+ SslHandler sslHandler = configureClientSSLOnDemand();
+ if (sslHandler != null) {
+ // must close on SSL exception
+ sslHandler.setCloseOnSSLException(true);
+ LOG.debug("Client SSL handler configured and added to the ChannelPipeline: {}", sslHandler);
+ addToPipeline("ssl", channelPipeline, sslHandler);
+ }
+
+ List<ChannelHandler> decoders = producer.getConfiguration().getDecoders();
+ for (int x = 0; x < decoders.size(); x++) {
+ ChannelHandler decoder = decoders.get(x);
+ if (decoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as it may not be shareable
+ decoder = ((ChannelHandlerFactory) decoder).newChannelHandler();
+ }
+ addToPipeline("decoder-" + x, channelPipeline, decoder);
+ }
+
+ List<ChannelHandler> encoders = producer.getConfiguration().getEncoders();
+ for (int x = 0; x < encoders.size(); x++) {
+ ChannelHandler encoder = encoders.get(x);
+ if (encoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as it may not be shareable
+ encoder = ((ChannelHandlerFactory) encoder).newChannelHandler();
+ }
+ addToPipeline("encoder-" + x, channelPipeline, encoder);
+ }
+
+ // do we use request timeout?
+ if (producer.getConfiguration().getRequestTimeout() > 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
+ }
+ ChannelHandler timeout = new ReadTimeoutHandler(NettyComponent.getTimer(), producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
+ addToPipeline("timeout", channelPipeline, timeout);
+ }
+
+ // our handler must be added last
+ addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer));
+
+ LOG.trace("Created ChannelPipeline: {}", channelPipeline);
+ return channelPipeline;
+ }
+
+ private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler) {
+ pipeline.addLast(name, handler);
+ }
+
+ private SSLContext createSSLContext(NettyProducer producer) throws Exception {
+ if (!producer.getConfiguration().isSsl()) {
+ return null;
+ }
+
+ // create ssl context once
+ if (producer.getConfiguration().getSslContextParameters() != null) {
+ SSLContext context = producer.getConfiguration().getSslContextParameters().createSSLContext();
+ return context;
+ }
+
+ return null;
+ }
+
+ private SslHandler configureClientSSLOnDemand() throws Exception {
+ if (!producer.getConfiguration().isSsl()) {
+ return null;
+ }
+
+ if (producer.getConfiguration().getSslHandler() != null) {
+ return producer.getConfiguration().getSslHandler();
+ } else if (sslContext != null) {
+ SSLEngine engine = sslContext.createSSLEngine();
+ engine.setUseClientMode(true);
+ return new SslHandler(engine);
+ } else {
+ if (producer.getConfiguration().getKeyStoreFile() == null && producer.getConfiguration().getKeyStoreResource() == null) {
+ LOG.debug("keystorefile is null");
+ }
+ if (producer.getConfiguration().getTrustStoreFile() == null && producer.getConfiguration().getTrustStoreResource() == null) {
+ LOG.debug("truststorefile is null");
+ }
+ if (producer.getConfiguration().getPassphrase().toCharArray() == null) {
+ LOG.debug("passphrase is null");
+ }
+ SSLEngineFactory sslEngineFactory;
+ if (producer.getConfiguration().getKeyStoreFile() != null || producer.getConfiguration().getTrustStoreFile() != null) {
+ sslEngineFactory = new SSLEngineFactory(
+ producer.getConfiguration().getKeyStoreFormat(),
+ producer.getConfiguration().getSecurityProvider(),
+ producer.getConfiguration().getKeyStoreFile(),
+ producer.getConfiguration().getTrustStoreFile(),
+ producer.getConfiguration().getPassphrase().toCharArray());
+ } else {
+ sslEngineFactory = new SSLEngineFactory(producer.getContext().getClassResolver(),
+ producer.getConfiguration().getKeyStoreFormat(),
+ producer.getConfiguration().getSecurityProvider(),
+ producer.getConfiguration().getKeyStoreResource(),
+ producer.getConfiguration().getTrustStoreResource(),
+ producer.getConfiguration().getPassphrase().toCharArray());
+ }
+ SSLEngine sslEngine = sslEngineFactory.createClientSSLEngine();
+ return new SslHandler(sslEngine);
+ }
+ }
+
+ @Override
+ public ClientPipelineFactory createPipelineFactory(NettyProducer producer) {
+ return new DefaultClientPipelineFactory(producer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
new file mode 100644
index 0000000..da457c4
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/DefaultServerPipelineFactory.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.List;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty4.handlers.ServerChannelHandler;
+import org.apache.camel.component.netty4.ssl.SSLEngineFactory;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultServerPipelineFactory extends ServerPipelineFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultServerPipelineFactory.class);
+
+ private NettyConsumer consumer;
+ private SSLContext sslContext;
+
+ public DefaultServerPipelineFactory(NettyServerBootstrapConfiguration configuration) {
+ this.consumer = null;
+ try {
+ this.sslContext = createSSLContext(configuration);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
+ }
+
+ public DefaultServerPipelineFactory(NettyConsumer consumer) {
+ this.consumer = consumer;
+ try {
+ this.sslContext = createSSLContext(consumer.getConfiguration());
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline channelPipeline = Channels.pipeline();
+
+ SslHandler sslHandler = configureServerSSLOnDemand();
+ if (sslHandler != null) {
+ // must close on SSL exception
+ sslHandler.setCloseOnSSLException(true);
+ LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}", sslHandler);
+ addToPipeline("ssl", channelPipeline, sslHandler);
+ }
+
+ List<ChannelHandler> encoders = consumer.getConfiguration().getEncoders();
+ for (int x = 0; x < encoders.size(); x++) {
+ ChannelHandler encoder = encoders.get(x);
+ if (encoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as it may not be shareable
+ encoder = ((ChannelHandlerFactory) encoder).newChannelHandler();
+ }
+ addToPipeline("encoder-" + x, channelPipeline, encoder);
+ }
+
+ List<ChannelHandler> decoders = consumer.getConfiguration().getDecoders();
+ for (int x = 0; x < decoders.size(); x++) {
+ ChannelHandler decoder = decoders.get(x);
+ if (decoder instanceof ChannelHandlerFactory) {
+ // use the factory to create a new instance of the channel as it may not be shareable
+ decoder = ((ChannelHandlerFactory) decoder).newChannelHandler();
+ }
+ addToPipeline("decoder-" + x, channelPipeline, decoder);
+ }
+
+ if (consumer.getConfiguration().isOrderedThreadPoolExecutor()) {
+ // this must be added just before the ServerChannelHandler
+ // use ordered thread pool, to ensure we process the events in order, and can send back
+ // replies in the expected order. eg this is required by TCP.
+ // and use a Camel thread factory so we have consistent thread namings
+ ExecutionHandler executionHandler = new ExecutionHandler(consumer.getEndpoint().getComponent().getExecutorService());
+ addToPipeline("executionHandler", channelPipeline, executionHandler);
+ LOG.debug("Using OrderedMemoryAwareThreadPoolExecutor with core pool size: {}", consumer.getConfiguration().getMaximumPoolSize());
+ }
+
+ // our handler must be added last
+ addToPipeline("handler", channelPipeline, new ServerChannelHandler(consumer));
+
+ LOG.trace("Created ChannelPipeline: {}", channelPipeline);
+ return channelPipeline;
+ }
+
+ private void addToPipeline(String name, ChannelPipeline pipeline, ChannelHandler handler) {
+ pipeline.addLast(name, handler);
+ }
+
+ private SSLContext createSSLContext(NettyServerBootstrapConfiguration configuration) throws Exception {
+ if (!configuration.isSsl()) {
+ return null;
+ }
+
+ // create ssl context once
+ if (configuration.getSslContextParameters() != null) {
+ SSLContext context = configuration.getSslContextParameters().createSSLContext();
+ return context;
+ }
+
+ return null;
+ }
+
+ private SslHandler configureServerSSLOnDemand() throws Exception {
+ if (!consumer.getConfiguration().isSsl()) {
+ return null;
+ }
+
+ if (consumer.getConfiguration().getSslHandler() != null) {
+ return consumer.getConfiguration().getSslHandler();
+ } else if (sslContext != null) {
+ SSLEngine engine = sslContext.createSSLEngine();
+ engine.setUseClientMode(false);
+ engine.setNeedClientAuth(consumer.getConfiguration().isNeedClientAuth());
+ return new SslHandler(engine);
+ } else {
+ if (consumer.getConfiguration().getKeyStoreFile() == null && consumer.getConfiguration().getKeyStoreResource() == null) {
+ LOG.debug("keystorefile is null");
+ }
+ if (consumer.getConfiguration().getTrustStoreFile() == null && consumer.getConfiguration().getTrustStoreResource() == null) {
+ LOG.debug("truststorefile is null");
+ }
+ if (consumer.getConfiguration().getPassphrase().toCharArray() == null) {
+ LOG.debug("passphrase is null");
+ }
+ SSLEngineFactory sslEngineFactory;
+ if (consumer.getConfiguration().getKeyStoreFile() != null || consumer.getConfiguration().getTrustStoreFile() != null) {
+ sslEngineFactory = new SSLEngineFactory(
+ consumer.getConfiguration().getKeyStoreFormat(),
+ consumer.getConfiguration().getSecurityProvider(),
+ consumer.getConfiguration().getKeyStoreFile(),
+ consumer.getConfiguration().getTrustStoreFile(),
+ consumer.getConfiguration().getPassphrase().toCharArray());
+ } else {
+ sslEngineFactory = new SSLEngineFactory(consumer.getContext().getClassResolver(),
+ consumer.getConfiguration().getKeyStoreFormat(),
+ consumer.getConfiguration().getSecurityProvider(),
+ consumer.getConfiguration().getKeyStoreResource(),
+ consumer.getConfiguration().getTrustStoreResource(),
+ consumer.getConfiguration().getPassphrase().toCharArray());
+ }
+ SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine();
+ sslEngine.setUseClientMode(false);
+ sslEngine.setNeedClientAuth(consumer.getConfiguration().isNeedClientAuth());
+ return new SslHandler(sslEngine);
+ }
+ }
+
+ @Override
+ public ServerPipelineFactory createPipelineFactory(NettyConsumer consumer) {
+ return new DefaultServerPipelineFactory(consumer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java
new file mode 100644
index 0000000..19c4bb1
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyCamelState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * Stores state for {@link NettyProducer} when sending messages.
+ * <p/>
+ * This allows the {@link org.apache.camel.component.netty.handlers.ClientChannelHandler} to access
+ * this state, which is needed so we can get hold of the current {@link Exchange} and the
+ * {@link AsyncCallback} so we can continue routing the message in the Camel routing engine.
+ */
+public final class NettyCamelState {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ public NettyCamelState(AsyncCallback callback, Exchange exchange) {
+ this.callback = callback;
+ this.exchange = exchange;
+ }
+
+ public AsyncCallback getCallback() {
+ return callback;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java
new file mode 100644
index 0000000..86d31d6
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyClientBossPoolBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioClientBossPool;
+import org.jboss.netty.util.HashedWheelTimer;
+
+/**
+ * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools
+ * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyClientBossPoolBuilder {
+
+ private String name = "NettyClientBoss";
+ private String pattern;
+ private int bossCount = 1;
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ public void setBossCount(int bossCount) {
+ this.bossCount = bossCount;
+ }
+
+ public NettyClientBossPoolBuilder withName(String name) {
+ setName(name);
+ return this;
+ }
+
+ public NettyClientBossPoolBuilder withPattern(String pattern) {
+ setPattern(pattern);
+ return this;
+ }
+
+ public NettyClientBossPoolBuilder withBossCount(int bossCount) {
+ setBossCount(bossCount);
+ return this;
+ }
+
+ /**
+ * Creates a new boss pool.
+ */
+ BossPool build() {
+ return new NioClientBossPool(Executors.newCachedThreadPool(), bossCount, new HashedWheelTimer(), new CamelNettyThreadNameDeterminer(pattern, name));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java
new file mode 100644
index 0000000..67a533b
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyComponent.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.IntrospectionSupport;
+import org.apache.camel.util.concurrent.CamelThreadFactory;
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+
+public class NettyComponent extends DefaultComponent {
+ // use a shared timer for Netty (see javadoc for HashedWheelTimer)
+ private static volatile Timer timer;
+ private NettyConfiguration configuration;
+ private OrderedMemoryAwareThreadPoolExecutor executorService;
+
+ public NettyComponent() {
+ }
+
+ public NettyComponent(CamelContext context) {
+ super(context);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ NettyConfiguration config;
+ if (configuration != null) {
+ config = configuration.copy();
+ } else {
+ config = new NettyConfiguration();
+ }
+ config = parseConfiguration(config, remaining, parameters);
+
+ // merge any custom bootstrap configuration on the config
+ NettyServerBootstrapConfiguration bootstrapConfiguration = resolveAndRemoveReferenceParameter(parameters, "bootstrapConfiguration", NettyServerBootstrapConfiguration.class);
+ if (bootstrapConfiguration != null) {
+ Map<String, Object> options = new HashMap<String, Object>();
+ if (IntrospectionSupport.getProperties(bootstrapConfiguration, options, null, false)) {
+ IntrospectionSupport.setProperties(getCamelContext().getTypeConverter(), config, options);
+ }
+ }
+
+ // validate config
+ config.validateConfiguration();
+
+ NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, config);
+ nettyEndpoint.setTimer(getTimer());
+ setProperties(nettyEndpoint.getConfiguration(), parameters);
+ return nettyEndpoint;
+ }
+
+ /**
+ * Parses the configuration
+ *
+ * @return the parsed and valid configuration to use
+ */
+ protected NettyConfiguration parseConfiguration(NettyConfiguration configuration, String remaining, Map<String, Object> parameters) throws Exception {
+ configuration.parseURI(new URI(remaining), parameters, this, "tcp", "udp");
+ return configuration;
+ }
+
+ public NettyConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(NettyConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public static Timer getTimer() {
+ return timer;
+ }
+
+ public synchronized OrderedMemoryAwareThreadPoolExecutor getExecutorService() {
+ if (executorService == null) {
+ executorService = createExecutorService();
+ }
+ return executorService;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (timer == null) {
+ timer = new HashedWheelTimer();
+ }
+
+ if (configuration == null) {
+ configuration = new NettyConfiguration();
+ }
+ if (configuration.isOrderedThreadPoolExecutor()) {
+ executorService = createExecutorService();
+ }
+
+ super.doStart();
+ }
+
+ protected OrderedMemoryAwareThreadPoolExecutor createExecutorService() {
+ // use ordered thread pool, to ensure we process the events in order, and can send back
+ // replies in the expected order. eg this is required by TCP.
+ // and use a Camel thread factory so we have consistent thread namings
+ // we should use a shared thread pool as recommended by Netty
+ String pattern = getCamelContext().getExecutorServiceManager().getThreadNamePattern();
+ ThreadFactory factory = new CamelThreadFactory(pattern, "NettyOrderedWorker", true);
+ return new OrderedMemoryAwareThreadPoolExecutor(configuration.getMaximumPoolSize(),
+ 0L, 0L, 30, TimeUnit.SECONDS, factory);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ timer.stop();
+ timer = null;
+
+ if (executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ executorService = null;
+ }
+
+ super.doStop();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
new file mode 100644
index 0000000..37a3df8
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.IntrospectionSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.util.CharsetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConfiguration extends NettyServerBootstrapConfiguration implements Cloneable {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyConfiguration.class);
+
+ private long requestTimeout;
+ private boolean sync = true;
+ private boolean textline;
+ private TextLineDelimiter delimiter = TextLineDelimiter.LINE;
+ private boolean autoAppendDelimiter = true;
+ private int decoderMaxLineLength = 1024;
+ private String encoding;
+ private List<ChannelHandler> encoders = new ArrayList<ChannelHandler>();
+ private List<ChannelHandler> decoders = new ArrayList<ChannelHandler>();
+ private boolean disconnect;
+ private boolean lazyChannelCreation = true;
+ private boolean transferExchange;
+ private boolean disconnectOnNoReply = true;
+ private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
+ private LoggingLevel serverExceptionCaughtLogLevel = LoggingLevel.WARN;
+ private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG;
+ private boolean allowDefaultCodec = true;
+ private ClientPipelineFactory clientPipelineFactory;
+ private int maximumPoolSize = 16;
+ private boolean orderedThreadPoolExecutor = true;
+ private int producerPoolMaxActive = -1;
+ private int producerPoolMinIdle;
+ private int producerPoolMaxIdle = 100;
+ private long producerPoolMinEvictableIdle = 5 * 60 * 1000L;
+ private boolean producerPoolEnabled = true;
+
+ /**
+ * Returns a copy of this configuration
+ */
+ public NettyConfiguration copy() {
+ try {
+ NettyConfiguration answer = (NettyConfiguration) clone();
+ // make sure the lists is copied in its own instance
+ List<ChannelHandler> encodersCopy = new ArrayList<ChannelHandler>(encoders);
+ answer.setEncoders(encodersCopy);
+ List<ChannelHandler> decodersCopy = new ArrayList<ChannelHandler>(decoders);
+ answer.setDecoders(decodersCopy);
+ return answer;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ public void validateConfiguration() {
+ // validate that the encoders is either shareable or is a handler factory
+ for (ChannelHandler encoder : encoders) {
+ if (encoder instanceof ChannelHandlerFactory) {
+ continue;
+ }
+ if (ObjectHelper.getAnnotation(encoder, ChannelHandler.Sharable.class) != null) {
+ continue;
+ }
+ LOG.warn("The encoder {} is not @Shareable or an ChannelHandlerFactory instance. The encoder cannot safely be used.", encoder);
+ }
+
+ // validate that the decoders is either shareable or is a handler factory
+ for (ChannelHandler decoder : decoders) {
+ if (decoder instanceof ChannelHandlerFactory) {
+ continue;
+ }
+ if (ObjectHelper.getAnnotation(decoder, ChannelHandler.Sharable.class) != null) {
+ continue;
+ }
+ LOG.warn("The decoder {} is not @Shareable or an ChannelHandlerFactory instance. The decoder cannot safely be used.", decoder);
+ }
+ if (sslHandler != null) {
+ boolean factory = sslHandler instanceof ChannelHandlerFactory;
+ boolean shareable = ObjectHelper.getAnnotation(sslHandler, ChannelHandler.Sharable.class) != null;
+ if (!factory && !shareable) {
+ LOG.warn("The sslHandler {} is not @Shareable or an ChannelHandlerFactory instance. The sslHandler cannot safely be used.", sslHandler);
+ }
+ }
+ }
+
+ public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component, String... supportedProtocols) throws Exception {
+ protocol = uri.getScheme();
+
+ boolean found = false;
+ for (String supportedProtocol : supportedProtocols) {
+ if (protocol != null && protocol.equalsIgnoreCase(supportedProtocol)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException("Unrecognized Netty protocol: " + protocol + " for uri: " + uri);
+ }
+
+ setHost(uri.getHost());
+ setPort(uri.getPort());
+
+ ssl = component.getAndRemoveParameter(parameters, "ssl", boolean.class, false);
+ sslHandler = component.resolveAndRemoveReferenceParameter(parameters, "sslHandler", SslHandler.class, sslHandler);
+ passphrase = component.getAndRemoveParameter(parameters, "passphrase", String.class, passphrase);
+ keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, keyStoreFormat == null ? "JKS" : keyStoreFormat);
+ securityProvider = component.getAndRemoveParameter(parameters, "securityProvider", String.class, securityProvider == null ? "SunX509" : securityProvider);
+ keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, keyStoreFile);
+ trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, trustStoreFile);
+ keyStoreResource = component.getAndRemoveParameter(parameters, "keyStoreResource", String.class, keyStoreResource);
+ trustStoreResource = component.getAndRemoveParameter(parameters, "trustStoreResource", String.class, trustStoreResource);
+ clientPipelineFactory = component.resolveAndRemoveReferenceParameter(parameters, "clientPipelineFactory", ClientPipelineFactory.class, clientPipelineFactory);
+ serverPipelineFactory = component.resolveAndRemoveReferenceParameter(parameters, "serverPipelineFactory", ServerPipelineFactory.class, serverPipelineFactory);
+
+ // set custom encoders and decoders first
+ List<ChannelHandler> referencedEncoders = component.resolveAndRemoveReferenceListParameter(parameters, "encoders", ChannelHandler.class, null);
+ addToHandlersList(encoders, referencedEncoders, ChannelHandler.class);
+ List<ChannelHandler> referencedDecoders = component.resolveAndRemoveReferenceListParameter(parameters, "decoders", ChannelHandler.class, null);
+ addToHandlersList(decoders, referencedDecoders, ChannelHandler.class);
+
+ // then set parameters with the help of the camel context type converters
+ EndpointHelper.setReferenceProperties(component.getCamelContext(), this, parameters);
+ EndpointHelper.setProperties(component.getCamelContext(), this, parameters);
+
+ // additional netty options, we don't want to store an empty map, so set it as null if empty
+ options = IntrospectionSupport.extractProperties(parameters, "option.");
+ if (options != null && options.isEmpty()) {
+ options = null;
+ }
+
+ // add default encoders and decoders
+ if (encoders.isEmpty() && decoders.isEmpty()) {
+ if (allowDefaultCodec) {
+ // are we textline or object?
+ if (isTextline()) {
+ Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
+ encoders.add(ChannelHandlerFactories.newStringEncoder(charset));
+ ChannelBuffer[] delimiters = delimiter == TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter();
+ decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength, delimiters));
+ decoders.add(ChannelHandlerFactories.newStringDecoder(charset));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using textline encoders and decoders with charset: {}, delimiter: {} and decoderMaxLineLength: {}",
+ new Object[]{charset, delimiter, decoderMaxLineLength});
+ }
+ } else {
+ // object serializable is then used
+ encoders.add(ChannelHandlerFactories.newObjectEncoder());
+ decoders.add(ChannelHandlerFactories.newObjectDecoder());
+
+ LOG.debug("Using object encoders and decoders");
+ }
+ } else {
+ LOG.debug("No encoders and decoders will be used");
+ }
+ } else {
+ LOG.debug("Using configured encoders and/or decoders");
+ }
+ }
+
+ public String getCharsetName() {
+ if (encoding == null) {
+ return null;
+ }
+ if (!Charset.isSupported(encoding)) {
+ throw new IllegalArgumentException("The encoding: " + encoding + " is not supported");
+ }
+
+ return Charset.forName(encoding).name();
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public boolean isSync() {
+ return sync;
+ }
+
+ public void setSync(boolean sync) {
+ this.sync = sync;
+ }
+
+ public boolean isTextline() {
+ return textline;
+ }
+
+ public void setTextline(boolean textline) {
+ this.textline = textline;
+ }
+
+ public int getDecoderMaxLineLength() {
+ return decoderMaxLineLength;
+ }
+
+ public void setDecoderMaxLineLength(int decoderMaxLineLength) {
+ this.decoderMaxLineLength = decoderMaxLineLength;
+ }
+
+ public TextLineDelimiter getDelimiter() {
+ return delimiter;
+ }
+
+ public void setDelimiter(TextLineDelimiter delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ public boolean isAutoAppendDelimiter() {
+ return autoAppendDelimiter;
+ }
+
+ public void setAutoAppendDelimiter(boolean autoAppendDelimiter) {
+ this.autoAppendDelimiter = autoAppendDelimiter;
+ }
+
+ public String getEncoding() {
+ return encoding;
+ }
+
+ public void setEncoding(String encoding) {
+ this.encoding = encoding;
+ }
+
+ public List<ChannelHandler> getDecoders() {
+ return decoders;
+ }
+
+ public void setDecoders(List<ChannelHandler> decoders) {
+ this.decoders = decoders;
+ }
+
+ public List<ChannelHandler> getEncoders() {
+ return encoders;
+ }
+
+ public void setEncoders(List<ChannelHandler> encoders) {
+ this.encoders = encoders;
+ }
+
+ public ChannelHandler getEncoder() {
+ return encoders.isEmpty() ? null : encoders.get(0);
+ }
+
+ public void setEncoder(ChannelHandler encoder) {
+ if (!encoders.contains(encoder)) {
+ encoders.add(encoder);
+ }
+ }
+
+ public ChannelHandler getDecoder() {
+ return decoders.isEmpty() ? null : decoders.get(0);
+ }
+
+ public void setDecoder(ChannelHandler decoder) {
+ if (!decoders.contains(decoder)) {
+ decoders.add(decoder);
+ }
+ }
+
+ public boolean isDisconnect() {
+ return disconnect;
+ }
+
+ public void setDisconnect(boolean disconnect) {
+ this.disconnect = disconnect;
+ }
+
+ public boolean isLazyChannelCreation() {
+ return lazyChannelCreation;
+ }
+
+ public void setLazyChannelCreation(boolean lazyChannelCreation) {
+ this.lazyChannelCreation = lazyChannelCreation;
+ }
+
+ public boolean isTransferExchange() {
+ return transferExchange;
+ }
+
+ public void setTransferExchange(boolean transferExchange) {
+ this.transferExchange = transferExchange;
+ }
+
+ public boolean isDisconnectOnNoReply() {
+ return disconnectOnNoReply;
+ }
+
+ public void setDisconnectOnNoReply(boolean disconnectOnNoReply) {
+ this.disconnectOnNoReply = disconnectOnNoReply;
+ }
+
+ public LoggingLevel getNoReplyLogLevel() {
+ return noReplyLogLevel;
+ }
+
+ public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) {
+ this.noReplyLogLevel = noReplyLogLevel;
+ }
+
+ public LoggingLevel getServerExceptionCaughtLogLevel() {
+ return serverExceptionCaughtLogLevel;
+ }
+
+ public void setServerExceptionCaughtLogLevel(LoggingLevel serverExceptionCaughtLogLevel) {
+ this.serverExceptionCaughtLogLevel = serverExceptionCaughtLogLevel;
+ }
+
+ public LoggingLevel getServerClosedChannelExceptionCaughtLogLevel() {
+ return serverClosedChannelExceptionCaughtLogLevel;
+ }
+
+ public void setServerClosedChannelExceptionCaughtLogLevel(LoggingLevel serverClosedChannelExceptionCaughtLogLevel) {
+ this.serverClosedChannelExceptionCaughtLogLevel = serverClosedChannelExceptionCaughtLogLevel;
+ }
+
+ public boolean isAllowDefaultCodec() {
+ return allowDefaultCodec;
+ }
+
+ public void setAllowDefaultCodec(boolean allowDefaultCodec) {
+ this.allowDefaultCodec = allowDefaultCodec;
+ }
+
+ public void setClientPipelineFactory(ClientPipelineFactory clientPipelineFactory) {
+ this.clientPipelineFactory = clientPipelineFactory;
+ }
+
+ public ClientPipelineFactory getClientPipelineFactory() {
+ return clientPipelineFactory;
+ }
+
+ public int getMaximumPoolSize() {
+ return maximumPoolSize;
+ }
+
+ public void setMaximumPoolSize(int maximumPoolSize) {
+ this.maximumPoolSize = maximumPoolSize;
+ }
+
+ public boolean isOrderedThreadPoolExecutor() {
+ return orderedThreadPoolExecutor;
+ }
+
+ public void setOrderedThreadPoolExecutor(boolean orderedThreadPoolExecutor) {
+ this.orderedThreadPoolExecutor = orderedThreadPoolExecutor;
+ }
+
+ public int getProducerPoolMaxActive() {
+ return producerPoolMaxActive;
+ }
+
+ public void setProducerPoolMaxActive(int producerPoolMaxActive) {
+ this.producerPoolMaxActive = producerPoolMaxActive;
+ }
+
+ public int getProducerPoolMinIdle() {
+ return producerPoolMinIdle;
+ }
+
+ public void setProducerPoolMinIdle(int producerPoolMinIdle) {
+ this.producerPoolMinIdle = producerPoolMinIdle;
+ }
+
+ public int getProducerPoolMaxIdle() {
+ return producerPoolMaxIdle;
+ }
+
+ public void setProducerPoolMaxIdle(int producerPoolMaxIdle) {
+ this.producerPoolMaxIdle = producerPoolMaxIdle;
+ }
+
+ public long getProducerPoolMinEvictableIdle() {
+ return producerPoolMinEvictableIdle;
+ }
+
+ public void setProducerPoolMinEvictableIdle(long producerPoolMinEvictableIdle) {
+ this.producerPoolMinEvictableIdle = producerPoolMinEvictableIdle;
+ }
+
+ public boolean isProducerPoolEnabled() {
+ return producerPoolEnabled;
+ }
+
+ public void setProducerPoolEnabled(boolean producerPoolEnabled) {
+ this.producerPoolEnabled = producerPoolEnabled;
+ }
+
+ private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
+ if (handlers != null) {
+ for (T handler : handlers) {
+ if (handlerType.isInstance(handler)) {
+ configured.add(handler);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
new file mode 100644
index 0000000..e381e61
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+/**
+ * Netty constants
+ *
+ * @version
+ */
+public final class NettyConstants {
+
+ public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete";
+ public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext";
+ public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
+ public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress";
+ public static final String NETTY_LOCAL_ADDRESS = "CamelNettyLocalAddress";
+ public static final String NETTY_SSL_SESSION = "CamelNettySSLSession";
+ public static final String NETTY_SSL_CLIENT_CERT_SUBJECT_NAME = "CamelNettySSLClientCertSubjectName";
+ public static final String NETTY_SSL_CLIENT_CERT_ISSUER_NAME = "CamelNettySSLClientCertIssuerName";
+ public static final String NETTY_SSL_CLIENT_CERT_SERIAL_NO = "CamelNettySSLClientCertSerialNumber";
+ public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore";
+ public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter";
+
+ private NettyConstants() {
+ // Utility class
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java
new file mode 100644
index 0000000..902e4d9
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConsumer extends DefaultConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
+ private CamelContext context;
+ private NettyConfiguration configuration;
+ private NettyServerBootstrapFactory nettyServerBootstrapFactory;
+
+ public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) {
+ super(nettyEndpoint, processor);
+ this.context = this.getEndpoint().getCamelContext();
+ this.configuration = configuration;
+ setNettyServerBootstrapFactory(configuration.getNettyServerBootstrapFactory());
+ setExceptionHandler(new NettyConsumerExceptionHandler(this));
+ }
+
+ @Override
+ public NettyEndpoint getEndpoint() {
+ return (NettyEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ LOG.debug("Netty consumer binding to: {}", configuration.getAddress());
+
+ if (nettyServerBootstrapFactory == null) {
+ // setup pipeline factory
+ ServerPipelineFactory pipelineFactory;
+ ServerPipelineFactory factory = configuration.getServerPipelineFactory();
+ if (factory != null) {
+ pipelineFactory = factory.createPipelineFactory(this);
+ } else {
+ pipelineFactory = new DefaultServerPipelineFactory(this);
+ }
+
+ if (isTcp()) {
+ nettyServerBootstrapFactory = new SingleTCPNettyServerBootstrapFactory();
+ } else {
+ nettyServerBootstrapFactory = new SingleUDPNettyServerBootstrapFactory();
+ }
+ nettyServerBootstrapFactory.init(context, configuration, pipelineFactory);
+ }
+
+ ServiceHelper.startServices(nettyServerBootstrapFactory);
+
+ LOG.info("Netty consumer bound to: " + configuration.getAddress());
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ LOG.debug("Netty consumer unbinding from: {}", configuration.getAddress());
+
+ ServiceHelper.stopService(nettyServerBootstrapFactory);
+
+ LOG.info("Netty consumer unbound from: " + configuration.getAddress());
+
+ super.doStop();
+ }
+
+ public CamelContext getContext() {
+ return context;
+ }
+
+ public NettyConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(NettyConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public NettyServerBootstrapFactory getNettyServerBootstrapFactory() {
+ return nettyServerBootstrapFactory;
+ }
+
+ public void setNettyServerBootstrapFactory(NettyServerBootstrapFactory nettyServerBootstrapFactory) {
+ this.nettyServerBootstrapFactory = nettyServerBootstrapFactory;
+ }
+
+ protected boolean isTcp() {
+ return configuration.getProtocol().equalsIgnoreCase("tcp");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java
new file mode 100644
index 0000000..75b1aad
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConsumerExceptionHandler.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConsumerExceptionHandler implements ExceptionHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
+ private final CamelLogger logger;
+ private final LoggingLevel closedLoggingLevel;
+
+ public NettyConsumerExceptionHandler(NettyConsumer consumer) {
+ this.logger = new CamelLogger(LOG, consumer.getConfiguration().getServerExceptionCaughtLogLevel());
+ this.closedLoggingLevel = consumer.getConfiguration().getServerClosedChannelExceptionCaughtLogLevel();
+ }
+
+ @Override
+ public void handleException(Throwable exception) {
+ handleException(null, null, exception);
+ }
+
+ @Override
+ public void handleException(String message, Throwable exception) {
+ handleException(message, null, exception);
+ }
+
+ @Override
+ public void handleException(String message, Exchange exchange, Throwable exception) {
+ try {
+ String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception);
+ boolean closed = ObjectHelper.getException(ClosedChannelException.class, exception) != null;
+ if (closed) {
+ logger.log(msg, exception, closedLoggingLevel);
+ } else {
+ logger.log(msg, exception);
+ }
+ } catch (Throwable e) {
+ // the logging exception handler must not cause new exceptions to occur
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
new file mode 100644
index 0000000..aa441d3
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConverter.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.sax.SAXSource;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamSource;
+
+import org.w3c.dom.Document;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * A set of converter methods for working with Netty types
+ *
+ * @version
+ */
+@Converter
+public final class NettyConverter {
+
+ private NettyConverter() {
+ //Utility Class
+ }
+
+ @Converter
+ public static byte[] toByteArray(ChannelBuffer buffer, Exchange exchange) {
+ return buffer.array();
+ }
+
+ @Converter
+ public static String toString(ChannelBuffer buffer, Exchange exchange) throws UnsupportedEncodingException {
+ byte[] bytes = toByteArray(buffer, exchange);
+ // use type converter as it can handle encoding set on the Exchange
+ if (exchange != null) {
+ return exchange.getContext().getTypeConverter().convertTo(String.class, exchange, bytes);
+ }
+ return new String(bytes, "UTF-8");
+ }
+
+ @Converter
+ public static InputStream toInputStream(ChannelBuffer buffer, Exchange exchange) {
+ return new ChannelBufferInputStream(buffer);
+ }
+
+ @Converter
+ public static ObjectInput toObjectInput(ChannelBuffer buffer, Exchange exchange) throws IOException {
+ InputStream is = toInputStream(buffer, exchange);
+ return new ObjectInputStream(is);
+ }
+
+ @Converter
+ public static ChannelBuffer toByteBuffer(byte[] bytes, Exchange exchange) {
+ ChannelBuffer buf = ChannelBuffers.dynamicBuffer(bytes.length);
+ buf.writeBytes(bytes);
+ return buf;
+ }
+
+ @Converter
+ public static ChannelBuffer toByteBuffer(String s, Exchange exchange) {
+ byte[] bytes;
+ if (exchange != null) {
+ // use type converter as it can handle encoding set on the Exchange
+ bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, exchange, s);
+ } else {
+ bytes = s.getBytes();
+ }
+ return toByteBuffer(bytes, exchange);
+ }
+
+ @Converter
+ public static Document toDocument(ChannelBuffer buffer, Exchange exchange) {
+ InputStream is = toInputStream(buffer, exchange);
+ return exchange.getContext().getTypeConverter().convertTo(Document.class, exchange, is);
+ }
+
+ @Converter
+ public static DOMSource toDOMSource(ChannelBuffer buffer, Exchange exchange) {
+ InputStream is = toInputStream(buffer, exchange);
+ return exchange.getContext().getTypeConverter().convertTo(DOMSource.class, exchange, is);
+ }
+
+ @Converter
+ public static SAXSource toSAXSource(ChannelBuffer buffer, Exchange exchange) {
+ InputStream is = toInputStream(buffer, exchange);
+ return exchange.getContext().getTypeConverter().convertTo(SAXSource.class, exchange, is);
+ }
+
+ @Converter
+ public static StreamSource toStreamSource(ChannelBuffer buffer, Exchange exchange) {
+ InputStream is = toInputStream(buffer, exchange);
+ return exchange.getContext().getTypeConverter().convertTo(StreamSource.class, exchange, is);
+ }
+
+ @Converter
+ public static StAXSource toStAXSource(ChannelBuffer buffer, Exchange exchange) {
+ InputStream is = toInputStream(buffer, exchange);
+ return exchange.getContext().getTypeConverter().convertTo(StAXSource.class, exchange, is);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
new file mode 100644
index 0000000..3bc9e90
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.math.BigInteger;
+import java.security.Principal;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.security.cert.X509Certificate;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.util.Timer;
+
+public class NettyEndpoint extends DefaultEndpoint {
+ private NettyConfiguration configuration;
+ private Timer timer;
+
+ public NettyEndpoint(String endpointUri, NettyComponent component, NettyConfiguration configuration) {
+ super(endpointUri, component);
+ this.configuration = configuration;
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ Consumer answer = new NettyConsumer(this, processor, configuration);
+ configureConsumer(answer);
+ return answer;
+ }
+
+ public Producer createProducer() throws Exception {
+ Producer answer = new NettyProducer(this, configuration);
+ if (isSynchronous()) {
+ return new SynchronousDelegateProducer(answer);
+ } else {
+ return answer;
+ }
+ }
+
+ public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
+ Exchange exchange = createExchange();
+ updateMessageHeader(exchange.getIn(), ctx, messageEvent);
+ NettyPayloadHelper.setIn(exchange, messageEvent.getMessage());
+ return exchange;
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public NettyComponent getComponent() {
+ return (NettyComponent) super.getComponent();
+ }
+
+ public NettyConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(NettyConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public void setTimer(Timer timer) {
+ this.timer = timer;
+ }
+
+ public Timer getTimer() {
+ return timer;
+ }
+
+ @Override
+ protected String createEndpointUri() {
+ ObjectHelper.notNull(configuration, "configuration");
+ return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(timer, "timer");
+ }
+
+ protected SSLSession getSSLSession(ChannelHandlerContext ctx) {
+ final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
+ SSLSession sslSession = null;
+ if (sslHandler != null) {
+ sslSession = sslHandler.getEngine().getSession();
+ }
+ return sslSession;
+ }
+
+ protected void updateMessageHeader(Message in, ChannelHandlerContext ctx, MessageEvent messageEvent) {
+ in.setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
+ in.setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
+ in.setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, messageEvent.getRemoteAddress());
+ in.setHeader(NettyConstants.NETTY_LOCAL_ADDRESS, messageEvent.getChannel().getLocalAddress());
+
+ if (configuration.isSsl()) {
+ // setup the SslSession header
+ SSLSession sslSession = getSSLSession(ctx);
+ in.setHeader(NettyConstants.NETTY_SSL_SESSION, sslSession);
+
+ // enrich headers with details from the client certificate if option is enabled
+ if (configuration.isSslClientCertHeaders()) {
+ enrichWithClientCertInformation(sslSession, in);
+ }
+ }
+ }
+
+ /**
+ * Enriches the message with client certificate details such as subject name, serial number etc.
+ * <p/>
+ * If the certificate is unverified then the headers is not enriched.
+ *
+ * @param sslSession the SSL session
+ * @param message the message to enrich
+ */
+ protected void enrichWithClientCertInformation(SSLSession sslSession, Message message) {
+ try {
+ X509Certificate[] certificates = sslSession.getPeerCertificateChain();
+ if (certificates != null && certificates.length > 0) {
+ X509Certificate cert = certificates[0];
+
+ Principal subject = cert.getSubjectDN();
+ if (subject != null) {
+ message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_SUBJECT_NAME, subject.getName());
+ }
+ Principal issuer = cert.getIssuerDN();
+ if (issuer != null) {
+ message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_ISSUER_NAME, issuer.getName());
+ }
+ BigInteger serial = cert.getSerialNumber();
+ if (serial != null) {
+ message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_SERIAL_NO, serial.toString());
+ }
+ message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_NOT_BEFORE, cert.getNotBefore());
+ message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_NOT_AFTER, cert.getNotAfter());
+ }
+ } catch (SSLPeerUnverifiedException e) {
+ // ignore
+ }
+ }
+
+}
\ No newline at end of file