You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ak...@apache.org on 2010/03/26 06:46:08 UTC

svn commit: r927698 [1/2] - in /camel/trunk: components/ components/camel-netty/ components/camel-netty/src/ components/camel-netty/src/main/ components/camel-netty/src/main/java/ components/camel-netty/src/main/java/org/ components/camel-netty/src/mai...

Author: akarpe
Date: Fri Mar 26 05:46:07 2010
New Revision: 927698

URL: http://svn.apache.org/viewvc?rev=927698&view=rev
Log:
CAMEL-2371 Added initial version of the Netty component

Added:
    camel/trunk/components/camel-netty/
    camel/trunk/components/camel-netty/pom.xml   (with props)
    camel/trunk/components/camel-netty/src/
    camel/trunk/components/camel-netty/src/main/
    camel/trunk/components/camel-netty/src/main/java/
    camel/trunk/components/camel-netty/src/main/java/org/
    camel/trunk/components/camel-netty/src/main/java/org/apache/
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java   (with props)
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ssl/
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ssl/SSLEngineFactory.java   (with props)
    camel/trunk/components/camel-netty/src/main/resources/
    camel/trunk/components/camel-netty/src/main/resources/META-INF/
    camel/trunk/components/camel-netty/src/main/resources/META-INF/LICENSE.txt   (with props)
    camel/trunk/components/camel-netty/src/main/resources/META-INF/NOTICE.txt   (with props)
    camel/trunk/components/camel-netty/src/main/resources/META-INF/services/
    camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/
    camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/
    camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/camel/
    camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/camel/component/
    camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/camel/component/netty
    camel/trunk/components/camel-netty/src/test/
    camel/trunk/components/camel-netty/src/test/java/
    camel/trunk/components/camel-netty/src/test/java/org/
    camel/trunk/components/camel-netty/src/test/java/org/apache/
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySSLTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPAsyncTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPAsyncTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPObjectSyncTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUDPSyncTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/Poetry.java   (with props)
    camel/trunk/components/camel-netty/src/test/resources/
    camel/trunk/components/camel-netty/src/test/resources/log4j.properties   (with props)
    camel/trunk/components/camel-netty/src/test/resources/test.txt   (with props)
Modified:
    camel/trunk/components/pom.xml
    camel/trunk/parent/pom.xml

Added: camel/trunk/components/camel-netty/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/pom.xml?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/pom.xml (added)
+++ camel/trunk/components/camel-netty/pom.xml Fri Mar 26 05:46:07 2010
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+	<!--
+		Licensed to the Apache Software Foundation (ASF) under one or more
+		contributor license agreements. See the NOTICE file distributed with
+		this work for additional information regarding copyright ownership.
+		The ASF licenses this file to You under the Apache License, Version
+		2.0 (the "License"); you may not use this file except in compliance
+		with the License. You may obtain a copy of the License at
+
+		http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+		applicable law or agreed to in writing, software distributed under the
+		License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+		CONDITIONS OF ANY KIND, either express or implied. See the License for
+		the specific language governing permissions and limitations under the
+		License.
+	-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.camel</groupId>
+		<artifactId>components</artifactId>
+		<version>2.3-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>camel-netty</artifactId>
+	<packaging>bundle</packaging>
+	<name>Camel :: Netty</name>
+	<description>Camel Netty NIO based socket communication component</description>
+
+	<properties>
+		<camel.osgi.export.pkg>
+			org.apache.camel.component.netty.*,
+		</camel.osgi.export.pkg>
+	</properties>
+
+	<dependencies>
+		
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-core</artifactId>
+		</dependency>
+        <dependency>
+            <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>${netty-version}</version>
+        </dependency>
+
+		<!-- testing -->
+		<dependency>
+			<groupId>org.apache.camel</groupId>
+			<artifactId>camel-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- logging -->
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+</project>

Propchange: camel/trunk/components/camel-netty/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty.handlers.ClientChannelHandler;
+import org.apache.camel.component.netty.ssl.SSLEngineFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+public class ClientPipelineFactory implements ChannelPipelineFactory {
+    private static final transient Log LOG = LogFactory.getLog(ClientPipelineFactory.class);
+    private NettyProducer producer;
+    private ChannelPipeline channelPipeline;
+
+    public ClientPipelineFactory(NettyProducer producer) {
+        this.producer = producer; 
+    }    
+    
+    public ChannelPipeline getPipeline() throws Exception {
+        if (channelPipeline != null) {
+            return channelPipeline;
+        }
+        
+        channelPipeline = Channels.pipeline();
+
+        SslHandler sslHandler = configureClientSSLOnDemand();
+        if (sslHandler != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Client SSL handler configured and added as an interceptor against the ChannelPipeline");
+            }
+            channelPipeline.addLast("ssl", sslHandler);            
+        }
+        
+        channelPipeline.addLast("decoder", producer.getConfiguration().getDecoder());
+        channelPipeline.addLast("encoder", producer.getConfiguration().getEncoder());
+        if (producer.getConfiguration().getHandler() != null) {
+            channelPipeline.addLast("handler", producer.getConfiguration().getHandler());
+        } else {
+            channelPipeline.addLast("handler", new ClientChannelHandler(producer));
+        }
+
+        return channelPipeline;
+    }
+
+    private SslHandler configureClientSSLOnDemand() throws Exception {
+        if (!producer.getConfiguration().isSsl()) {
+            return null;
+        }
+
+        if (producer.getConfiguration().getSslHandler() != null) {
+            return producer.getConfiguration().getSslHandler();
+        } else {
+            if (producer.getConfiguration().getKeyStoreFile() == null) {
+                LOG.debug("keystorefile is null");
+            } 
+            if (producer.getConfiguration().getTrustStoreFile() == null) {
+                LOG.debug("truststorefile is null");
+            }
+            if (producer.getConfiguration().getPassphrase().toCharArray() == null) {
+                LOG.debug("passphrase is null");
+            }
+            SSLEngineFactory sslEngineFactory = new SSLEngineFactory(
+                producer.getConfiguration().getKeyStoreFile(), 
+                producer.getConfiguration().getTrustStoreFile(), 
+                producer.getConfiguration().getPassphrase().toCharArray());
+            SSLEngine sslEngine = sslEngineFactory.createClientSSLEngine();
+            return new SslHandler(sslEngine);
+        }
+    }
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+public class NettyComponent extends DefaultComponent {
+    private NettyConfiguration config;
+
+    public NettyComponent() {
+        config = new NettyConfiguration();
+    }
+
+    public NettyComponent(CamelContext context) {
+        super(context);
+        config = new NettyConfiguration();
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        config.parseURI(new URI(remaining), parameters, this);
+        
+        NettyEndpoint nettyEndpoint = new NettyEndpoint(remaining, this, config);
+        setProperties(nettyEndpoint.getConfiguration(), parameters);
+        return nettyEndpoint;
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.camel.util.URISupport;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+public class NettyConfiguration {
+    private String protocol;
+    private String host;
+    private int port;
+    private boolean keepAlive;
+    private boolean tcpNoDelay;
+    private boolean broadcast;
+    private long connectTimeoutMillis;
+    private long receiveTimeoutMillis;
+    private boolean reuseAddress;
+    private boolean sync;
+    private String passphrase;
+    private File keyStoreFile;
+    private File trustStoreFile;
+    private SslHandler sslHandler;
+    private ChannelDownstreamHandler encoder;
+    private ChannelUpstreamHandler decoder;
+    private ChannelHandler handler;
+    private boolean ssl;
+    private long sendBufferSize;
+    private long receiveBufferSize;
+    private int corePoolSize;
+    private int maxPoolSize;
+
+    public NettyConfiguration() {
+        setKeepAlive(true);
+        setTcpNoDelay(true);
+        setBroadcast(false);
+        setReuseAddress(true);
+        setSync(false);
+        setConnectTimeoutMillis(10000);
+        setReceiveTimeoutMillis(10000);
+        setSendBufferSize(65536);
+        setReceiveBufferSize(65536);
+        setSsl(false);
+        setCorePoolSize(10);
+        setMaxPoolSize(100);
+    }
+    
+    public void parseURI(URI uri, Map<String, Object> parameters, NettyComponent component) throws Exception {
+        protocol = uri.getScheme();
+        
+        if ((!protocol.equalsIgnoreCase("tcp")) && (!protocol.equalsIgnoreCase("udp"))) {
+            throw new IllegalArgumentException("Unrecognized Netty protocol: " + protocol + " for uri: " + uri);
+        }
+        
+        setHost(uri.getHost());
+        setPort(uri.getPort());
+     
+        sslHandler = component.resolveAndRemoveReferenceParameter(parameters, "sslHandler", SslHandler.class, null);
+        passphrase = component.resolveAndRemoveReferenceParameter(parameters, "passphrase", String.class, null);
+        keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, null);
+        trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, null);
+        encoder = component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder());
+        decoder = component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder());
+        handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null);        
+        
+        Map<String, Object> settings = URISupport.parseParameters(uri);
+        if (settings.containsKey("keepAlive")) {
+            setKeepAlive(Boolean.valueOf((String) settings.get("keepAlive")));
+        }         
+        if (settings.containsKey("tcpNoDelay")) {
+            setTcpNoDelay(Boolean.valueOf((String) settings.get("tcpNoDelay")));
+        }        
+        if (settings.containsKey("broadcast")) {
+            setBroadcast(Boolean.valueOf((String) settings.get("broadcast")));
+        }            
+        if (settings.containsKey("reuseAddress")) {
+            setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress")));
+        }
+        if (settings.containsKey("connectTimeoutMillis")) {
+            setConnectTimeoutMillis(Long.valueOf((String)settings.get("connectTimeoutMillis")));
+        }
+        if (settings.containsKey("sync")) {
+            setTcpNoDelay(Boolean.valueOf((String) settings.get("sync")));
+        }
+        if (settings.containsKey("receiveTimeoutMillis")) {
+            setReceiveTimeoutMillis(Long.valueOf((String)settings.get("receiveTimeoutMillis")));
+        }
+        if (settings.containsKey("sendBufferSize")) {
+            setSendBufferSize(Long.valueOf((String)settings.get("sendBufferSize")));
+        }
+        if (settings.containsKey("receiveBufferSize")) {
+            setReceiveBufferSize(Long.valueOf((String)settings.get("receiveBufferSize")));
+        }        
+        if (settings.containsKey("ssl")) {
+            setTcpNoDelay(Boolean.valueOf((String) settings.get("ssl")));
+        }
+        if (settings.containsKey("corePoolSize")) {
+            setCorePoolSize(Integer.valueOf((String)settings.get("corePoolSize")));
+        }
+        if (settings.containsKey("maxPoolSize")) {
+            setMaxPoolSize(Integer.valueOf((String)settings.get("maxPoolSize")));
+        }
+    }
+
+    public String getProtocol() {
+        return protocol;
+    }
+
+    public void setProtocol(String protocol) {
+        this.protocol = protocol;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public boolean isKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+   
+    public boolean isBroadcast() {
+        return broadcast;
+    }
+
+    public void setBroadcast(boolean broadcast) {
+        this.broadcast = broadcast;
+    }
+
+    public long getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+    public void setConnectTimeoutMillis(long connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
+    }
+
+    public boolean isReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public boolean isSync() {
+        return sync;
+    }
+
+    public void setSync(boolean sync) {
+        this.sync = sync;
+    }
+
+    public SslHandler getSslHandler() {
+        return sslHandler;
+    }
+
+    public void setSslHandler(SslHandler sslHandler) {
+        this.sslHandler = sslHandler;
+    }
+
+    public ChannelDownstreamHandler getEncoder() {
+        return encoder;
+    }
+
+    public void setEncoder(ChannelDownstreamHandler encoder) {
+        this.encoder = encoder;
+    }
+
+    public ChannelUpstreamHandler getDecoder() {
+        return decoder;
+    }
+
+    public void setDecoder(ChannelUpstreamHandler decoder) {
+        this.decoder = decoder;
+    }
+
+    public ChannelHandler getHandler() {
+        return handler;
+    }
+
+    public void setHandler(ChannelHandler handler) {
+        this.handler = handler;
+    }
+
+    public long getReceiveTimeoutMillis() {
+        return receiveTimeoutMillis;
+    }
+
+    public void setReceiveTimeoutMillis(long receiveTimeoutMillis) {
+        this.receiveTimeoutMillis = receiveTimeoutMillis;
+    }
+
+    public long getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(long sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+    
+    public boolean isSsl() {
+        return ssl;
+    }
+
+    public void setSsl(boolean ssl) {
+        this.ssl = ssl;
+    }
+
+    public long getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(long receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public String getPassphrase() {
+        return passphrase;
+    }
+
+    public void setPassphrase(String passphrase) {
+        this.passphrase = passphrase;
+    }
+
+    public File getKeyStoreFile() {
+        return keyStoreFile;
+    }
+
+    public void setKeyStoreFile(File keyStoreFile) {
+        this.keyStoreFile = keyStoreFile;
+    }
+
+    public File getTrustStoreFile() {
+        return trustStoreFile;
+    }
+
+    public void setTrustStoreFile(File trustStoreFile) {
+        this.trustStoreFile = trustStoreFile;
+    }
+
+    public int getCorePoolSize() {
+        return corePoolSize;
+    }
+
+    public void setCorePoolSize(int corePoolSize) {
+        this.corePoolSize = corePoolSize;
+    }
+
+    public int getMaxPoolSize() {
+        return maxPoolSize;
+    }
+
+    public void setMaxPoolSize(int maxPoolSize) {
+        this.maxPoolSize = maxPoolSize;
+    }    
+
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+public class NettyConsumer extends DefaultConsumer {
+    private static final transient Log LOG = LogFactory.getLog(NettyConsumer.class);
+    private CamelContext context;
+    private NettyConfiguration configuration;
+    private ChannelFactory channelFactory;
+    private DatagramChannelFactory datagramChannelFactory;
+    private ServerBootstrap serverBootstrap;
+    private ConnectionlessBootstrap connectionlessServerBootstrap;
+    
+    public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor,
+        NettyConfiguration configuration) {
+        super(nettyEndpoint, processor);
+        this.configuration = nettyEndpoint.getConfiguration();
+        this.context = this.getEndpoint().getCamelContext();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (configuration.getProtocol().equalsIgnoreCase("udp")) {
+            initializeUDPServerSocketCommunicationLayer();
+        } else {
+            initializeTCPServerSocketCommunicationLayer();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop(); 
+    }
+    
+    private void initializeTCPServerSocketCommunicationLayer() throws Exception {
+        ExecutorService bossExecutor = 
+            context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPBoss", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+        ExecutorService workerExecutor = 
+            context.getExecutorServiceStrategy().newThreadPool(this, "NettyTCPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+        channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
+        serverBootstrap = new ServerBootstrap(channelFactory);
+        
+        serverBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
+        serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+        serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+        serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+        serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));        
+        LOG.info("Netty TCP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort());
+    }
+
+    private void initializeUDPServerSocketCommunicationLayer() throws Exception {
+        ExecutorService workerExecutor = context.getExecutorServiceStrategy().newThreadPool(this, "NettyUDPWorker", configuration.getCorePoolSize(), configuration.getMaxPoolSize());
+        datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);        
+        connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+        
+        connectionlessServerBootstrap.setPipelineFactory(new ServerPipelineFactory(this));
+        connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+        connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        connectionlessServerBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+        connectionlessServerBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+        connectionlessServerBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+        connectionlessServerBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+        connectionlessServerBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+        connectionlessServerBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        LOG.info("Netty UDP Consumer started and now listening on Host: " + configuration.getHost() + " Port: " + configuration.getPort());
+    }
+    
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public ChannelFactory getChannelFactory() {
+        return channelFactory;
+    }
+
+    public void setChannelFactory(ChannelFactory channelFactory) {
+        this.channelFactory = channelFactory;
+    }
+
+
+    public DatagramChannelFactory getDatagramChannelFactory() {
+        return datagramChannelFactory;
+    }
+
+    public void setDatagramChannelFactory(
+        DatagramChannelFactory datagramChannelFactory) {
+        this.datagramChannelFactory = datagramChannelFactory;
+    } 
+    
+    public ServerBootstrap getServerBootstrap() {
+        return serverBootstrap;
+    }
+
+    public void setServerBootstrap(ServerBootstrap serverBootstrap) {
+        this.serverBootstrap = serverBootstrap;
+    }
+
+    public ConnectionlessBootstrap getConnectionlessServerBootstrap() {
+        return connectionlessServerBootstrap;
+    }
+
+    public void setConnectionlessServerBootstrap(
+            ConnectionlessBootstrap connectionlessServerBootstrap) {
+        this.connectionlessServerBootstrap = connectionlessServerBootstrap;
+    } 
+    
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+
+public class NettyEndpoint extends DefaultEndpoint {
+    private NettyConfiguration configuration;
+
+    public NettyEndpoint(String endpointUri, Component component, NettyConfiguration configuration) {
+        super(endpointUri, component);
+        this.configuration = configuration;
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new NettyConsumer(this, processor, configuration);
+    }
+
+    public Producer createProducer() throws Exception {
+        return new NettyProducer(this, configuration);
+    }
+
+    public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) {
+        Exchange exchange = createExchange();
+        exchange.getIn().setHeader("NettyChannelHandlerContext", ctx);
+        exchange.getIn().setHeader("NettyMessageEvent", messageEvent);
+        return exchange;        
+    }
+    
+    public boolean isSingleton() {
+        return false;
+    }
+
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.component.netty.handlers.ClientChannelHandler;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+
+public class NettyProducer extends DefaultProducer implements ServicePoolAware {
+    private static final transient Log LOG = LogFactory.getLog(NettyProducer.class);
+    private CamelContext context;
+    private NettyConfiguration configuration;
+    private CountDownLatch countdownLatch;
+    private ChannelFactory channelFactory;
+    private DatagramChannelFactory datagramChannelFactory;
+    private ChannelFuture channelFuture;
+    private ClientBootstrap clientBootstrap;
+    private ConnectionlessBootstrap connectionlessClientBootstrap;
+    private ClientPipelineFactory clientPipelineFactory;
+    private ChannelPipeline clientPipeline;
+
+    public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
+        super(nettyEndpoint);
+        this.configuration = configuration;
+        this.context = this.getEndpoint().getCamelContext();
+    } 
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (configuration.getProtocol().equalsIgnoreCase("udp")) {
+            setupUDPCommunication();
+        } else {
+            setupTCPCommunication();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+    }
+
+    
+    @Override
+    public boolean isSingleton() {
+        return false;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        if (configuration.isSync()) {
+            countdownLatch = new CountDownLatch(1);
+        }
+        
+        Channel channel = channelFuture.getChannel();
+        channel.write(exchange.getIn().getBody());
+        
+        if (configuration.isSync()) {
+            boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS);
+            if (!success) {
+                throw new ExchangeTimedOutException(exchange, configuration.getReceiveTimeoutMillis());
+            }
+            Object response = ((ClientChannelHandler) clientPipeline.get("handler")).getResponse();
+            exchange.getOut().setBody(response);
+        }                 
+    }
+
+    public void setupTCPCommunication() throws Exception {
+        if (channelFactory == null) {
+            ExecutorService bossExecutor = 
+                context.getExecutorServiceStrategy().newThreadPool(this, 
+                    "NettyTCPBoss", 
+                    configuration.getCorePoolSize(), 
+                    configuration.getMaxPoolSize());
+            ExecutorService workerExecutor = 
+                context.getExecutorServiceStrategy().newThreadPool(this, 
+                    "NettyTCPWorker", 
+                    configuration.getCorePoolSize(), 
+                    configuration.getMaxPoolSize());
+            channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
+        }
+        if (clientBootstrap == null) {
+            clientBootstrap = new ClientBootstrap(channelFactory);
+            clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+            clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+            clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+            clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+        }
+        if (clientPipelineFactory == null) {
+            clientPipelineFactory = new ClientPipelineFactory(this);
+            clientPipeline = clientPipelineFactory.getPipeline();
+            clientBootstrap.setPipeline(clientPipeline);
+        }
+        channelFuture = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); 
+        channelFuture.awaitUninterruptibly();
+        LOG.info("Netty TCP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort());
+    }
+    
+    public void setupUDPCommunication() throws Exception {
+        if (datagramChannelFactory == null) {
+            ExecutorService workerExecutor = 
+                context.getExecutorServiceStrategy().newThreadPool(this, 
+                    "NettyUDPWorker", 
+                    configuration.getCorePoolSize(), 
+                    configuration.getMaxPoolSize());
+            datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+        }
+        if (connectionlessClientBootstrap == null) {
+            connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+            connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+            connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+            connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+            connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeoutMillis());
+            connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+            connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+            connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+
+        }
+        if (clientPipelineFactory == null) {
+            clientPipelineFactory = new ClientPipelineFactory(this);
+            clientPipeline = clientPipelineFactory.getPipeline();
+            connectionlessClientBootstrap.setPipeline(clientPipeline);
+        }
+        connectionlessClientBootstrap.bind(new InetSocketAddress(0));
+        channelFuture = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort())); 
+        channelFuture.awaitUninterruptibly();
+        LOG.info("Netty UDP Producer started and now listening on Host: " + configuration.getHost() + "Port : " + configuration.getPort());
+    }    
+    
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public CountDownLatch getCountdownLatch() {
+        return countdownLatch;
+    }
+
+    public void setCountdownLatch(CountDownLatch countdownLatch) {
+        this.countdownLatch = countdownLatch;
+    }
+
+    public ChannelFactory getChannelFactory() {
+        return channelFactory;
+    }
+
+    public void setChannelFactory(ChannelFactory channelFactory) {
+        this.channelFactory = channelFactory;
+    }
+
+    public ChannelFuture getChannelFuture() {
+        return channelFuture;
+    }
+
+    public void setChannelFuture(ChannelFuture channelFuture) {
+        this.channelFuture = channelFuture;
+    }
+
+    public ClientBootstrap getClientBootstrap() {
+        return clientBootstrap;
+    }
+
+    public void setClientBootstrap(ClientBootstrap clientBootstrap) {
+        this.clientBootstrap = clientBootstrap;
+    }
+
+    public ClientPipelineFactory getClientPipelineFactory() {
+        return clientPipelineFactory;
+    }
+
+    public void setClientPipelineFactory(ClientPipelineFactory clientPipelineFactory) {
+        this.clientPipelineFactory = clientPipelineFactory;
+    }
+
+    public ChannelPipeline getClientPipeline() {
+        return clientPipeline;
+    }
+
+    public void setClientPipeline(ChannelPipeline clientPipeline) {
+        this.clientPipeline = clientPipeline;
+    }
+    
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty.handlers.ServerChannelHandler;
+import org.apache.camel.component.netty.ssl.SSLEngineFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+public class ServerPipelineFactory implements ChannelPipelineFactory {
+    private static final transient Log LOG = LogFactory.getLog(ServerPipelineFactory.class);
+    private NettyConsumer consumer;
+        
+    public ServerPipelineFactory(NettyConsumer consumer) {
+        this.consumer = consumer; 
+    }    
+
+    public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline channelPipeline = Channels.pipeline();
+
+        SslHandler sslHandler = configureServerSSLOnDemand();
+        if (sslHandler != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline");
+            }
+            channelPipeline.addLast("ssl", sslHandler);            
+        }
+        channelPipeline.addLast("decoder", consumer.getConfiguration().getDecoder());
+        channelPipeline.addLast("encoder", consumer.getConfiguration().getEncoder());
+        if (consumer.getConfiguration().getHandler() != null) {
+            channelPipeline.addLast("handler", consumer.getConfiguration().getHandler());
+        } else {
+            channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
+        }
+         
+        return channelPipeline;
+    }
+    
+    private SslHandler configureServerSSLOnDemand() throws Exception {
+        if (!consumer.getConfiguration().isSsl()) {
+            return null;
+        }
+
+        if (consumer.getConfiguration().getSslHandler() != null) {
+            return consumer.getConfiguration().getSslHandler();
+        } else {
+            SSLEngineFactory sslEngineFactory = 
+                new SSLEngineFactory(consumer.getConfiguration().getKeyStoreFile(), 
+                        consumer.getConfiguration().getTrustStoreFile(), 
+                        consumer.getConfiguration().getPassphrase().toCharArray());
+            SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine();
+            return new SslHandler(sslEngine);
+        }
+    }   
+
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.handlers;
+
+import org.apache.camel.component.netty.NettyProducer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+@ChannelPipelineCoverage("all")
+public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
+    private static final transient Log LOG = LogFactory.getLog(ClientChannelHandler.class);
+    private NettyProducer producer;
+    private Object response;
+    
+    public ClientChannelHandler(NettyProducer producer) {
+        super();
+        this.producer = producer;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
+        throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("An exception was caught by the ClientChannelHandler during communication", exceptionEvent.getCause());
+        }
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
+        throws Exception {
+        response = messageEvent.getMessage();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Incoming message:" + response);
+        }
+        if (producer.getConfiguration().isSync()) {
+            producer.getCountdownLatch().countDown();
+        }        
+    }
+
+    public Object getResponse() {
+        return response;
+    }
+
+    public void setResponse(Object response) {
+        this.response = response;
+    }
+    
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.handlers;
+
+import java.net.InetSocketAddress;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.netty.NettyConsumer;
+import org.apache.camel.component.netty.NettyEndpoint;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+@ChannelPipelineCoverage("all")
+public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
+    private static final transient Log LOG = LogFactory.getLog(ServerChannelHandler.class);
+    private NettyConsumer consumer;
+    
+    public ServerChannelHandler(NettyConsumer consumer) {
+        super();
+        this.consumer = consumer;    
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent)
+        throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("An exception was caught by the ServerChannelHandler during communication", exceptionEvent.getCause());
+        }
+    }
+    
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent)
+        throws Exception {
+        Object in = messageEvent.getMessage();
+        if (LOG.isDebugEnabled()) {
+            if (in instanceof byte[]) {
+                in = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
+            }
+            LOG.debug("Incoming message: " + in);
+        }
+        
+        // Dispatch exchange along the route and receive the final resulting exchange
+        dispatchExchange(ctx, messageEvent, in); 
+    }
+
+    private void sendResponsetoChannel(MessageEvent messageEvent, Exchange exchange) throws Exception {
+        ChannelFuture future;
+        Object body;
+        if (ExchangeHelper.isOutCapable(exchange)) {
+            body = exchange.getOut().getBody();
+        } else {
+            body = exchange.getIn().getBody();
+        }
+        
+        if (exchange.isFailed()) {
+            if (exchange.getException() == null) {
+                // fault detected
+                body = exchange.getOut().getBody();
+            } else {
+                body = exchange.getException();
+            }
+        }
+        
+        if (body == null) {
+            LOG.warn("No Oubound Response received following route completion: " + exchange);
+            LOG.warn("A response cannot be sent to the Client");
+            messageEvent.getChannel().close();
+        }
+        
+        if (consumer.getConfiguration().getProtocol().equalsIgnoreCase("udp")) {
+            future = messageEvent.getChannel().write(body, messageEvent.getRemoteAddress());
+        } else {
+            future = messageEvent.getChannel().write(body);
+        }
+        
+        if (!future.isSuccess()) {
+            String hostname = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getHostName();
+            int port = ((InetSocketAddress)messageEvent.getChannel().getRemoteAddress()).getPort();
+            throw new CamelExchangeException("Could not send response via Channel to remote host " + hostname + " and port " + port, exchange);
+        }
+        
+        if (LOG.isDebugEnabled()) {
+            if (body instanceof byte[]) {
+                body = consumer.getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, body);
+            }
+            LOG.debug("Sent Outgoing message: " + body);
+        }        
+    }
+
+    private void dispatchExchange(ChannelHandlerContext ctx, MessageEvent messageEvent, Object in) throws Exception {        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Consumer Dispatching the Incoming exchange along the route");
+        }
+
+        Exchange exchange = ((NettyEndpoint)consumer.getEndpoint()).createExchange(ctx, messageEvent);
+        if (consumer.getConfiguration().isSync()) {
+            exchange.setPattern(ExchangePattern.InOut);
+        }
+        exchange.getIn().setBody(in);
+        
+        try {
+            consumer.getProcessor().process(exchange);
+        } catch (Exception exception) {
+            throw new CamelExchangeException("Error in consumer while dispatching exchange for further processing", exchange);
+        }
+        
+        // Send back response if the communication is synchronous
+        if (consumer.getConfiguration().isSync()) {
+            sendResponsetoChannel(messageEvent, exchange);
+        }
+    }
+    
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ssl/SSLEngineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ssl/SSLEngineFactory.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ssl/SSLEngineFactory.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ssl/SSLEngineFactory.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.ssl;
+
+import java.io.File;
+import java.security.KeyStore;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.camel.converter.IOConverter;
+
+public class SSLEngineFactory {
+
+    private static final String SSL_PROTOCOL = "TLS";
+    private static SSLContext sslContext;
+    
+    public SSLEngineFactory(File keyStoreFile, File trustStoreFile, char[] passphrase) throws Exception {
+        super();        
+        
+        KeyStore ks = KeyStore.getInstance("JKS");
+        KeyStore ts = KeyStore.getInstance("JKS");
+
+        ks.load(IOConverter.toInputStream(keyStoreFile), passphrase);
+        ts.load(IOConverter.toInputStream(trustStoreFile), passphrase);
+
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+        kmf.init(ks, passphrase);
+
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+        tmf.init(ts);
+        
+        sslContext = SSLContext.getInstance(SSL_PROTOCOL);
+        sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+    }
+
+    public SSLEngine createServerSSLEngine() {
+        SSLEngine serverEngine = sslContext.createSSLEngine();
+        serverEngine.setUseClientMode(false);
+        serverEngine.setNeedClientAuth(true);
+        return serverEngine;
+    }
+
+    public SSLEngine createClientSSLEngine() {
+        SSLEngine clientEngine = sslContext.createSSLEngine();
+        clientEngine.setUseClientMode(true);
+        return clientEngine;
+    }
+    
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ssl/SSLEngineFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/resources/META-INF/LICENSE.txt
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/resources/META-INF/LICENSE.txt?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/resources/META-INF/LICENSE.txt (added)
+++ camel/trunk/components/camel-netty/src/main/resources/META-INF/LICENSE.txt Fri Mar 26 05:46:07 2010
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

Propchange: camel/trunk/components/camel-netty/src/main/resources/META-INF/LICENSE.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/resources/META-INF/NOTICE.txt
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/resources/META-INF/NOTICE.txt?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/resources/META-INF/NOTICE.txt (added)
+++ camel/trunk/components/camel-netty/src/main/resources/META-INF/NOTICE.txt Fri Mar 26 05:46:07 2010
@@ -0,0 +1,11 @@
+   =========================================================================
+   ==  NOTICE file corresponding to the section 4 d of                    ==
+   ==  the Apache License, Version 2.0,                                   ==
+   ==  in this case for the Apache Camel distribution.                    ==
+   =========================================================================
+
+   This product includes software developed by
+   The Apache Software Foundation (http://www.apache.org/).
+
+   Please read the different LICENSE files present in the licenses directory of
+   this distribution.

Propchange: camel/trunk/components/camel-netty/src/main/resources/META-INF/NOTICE.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/camel/component/netty
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/camel/component/netty?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/camel/component/netty (added)
+++ camel/trunk/components/camel-netty/src/main/resources/META-INF/services/org/apache/camel/component/netty Fri Mar 26 05:46:07 2010
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.netty.NettyComponent

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.CamelTestSupport;
+
+public class NettyConcurrentTest extends CamelTestSupport {
+
+    public void testNoConcurrentProducers() throws Exception {
+        doSendMessages(1, 1);
+    }
+
+    public void testConcurrentProducers() throws Exception {
+        doSendMessages(10, 5);
+    }
+
+    private void doSendMessages(int files, int poolSize) throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(files);
+
+        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+        Map<Integer, Future> responses = new ConcurrentHashMap();
+        for (int i = 0; i < files; i++) {
+            final int index = i;
+            Future out = executor.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    return template.requestBody("netty:tcp://localhost:5150?sync=true", index, String.class);
+                }
+            });
+            responses.put(index, out);
+        }
+
+        assertMockEndpointsSatisfied();
+        assertEquals(files, responses.size());
+
+        // get all responses
+        Set unique = new HashSet();
+        for (Future future : responses.values()) {
+            unique.add(future.get());
+        }
+
+        // should be 10 unique responses
+        assertEquals("Should be " + files + " unique responses", files, unique.size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:5150?sync=true").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String body = exchange.getIn().getBody(String.class);
+                        exchange.getOut().setBody("Bye " + body);
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyConcurrentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySSLTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySSLTest.java?rev=927698&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySSLTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySSLTest.java Fri Mar 26 05:46:07 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.netty;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.CamelTestSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class NettySSLTest extends CamelTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(NettySSLTest.class);
+  
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate producerTemplate;
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = new JndiRegistry(createJndiContext());
+        registry.bind("password", "changeit");
+        registry.bind("ksf", new File("src/test/resources/keystore.jks"));
+        registry.bind("tsf", new File("src/test/resources/keystore.jks"));
+        return registry;
+    }
+    
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    private void sendRequest() throws Exception {
+        String response = producerTemplate.requestBody(
+            "netty:tcp://localhost:5150?sync=true&ssl=true&passphrase=#password&keyStoreFile=#ksf&trustStoreFile=#tsf", 
+            "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds", String.class);        
+        assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
+    }
+ 
+    
+    @Test
+    public void testSSLInOutWithNettyConsumer() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("netty:tcp://localhost:5150?sync=true&ssl=true&passphrase=#password&keyStoreFile=#ksf&trustStoreFile=#tsf")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            exchange.getOut().setBody("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");                           
+                        }
+                    });
+            }
+        });
+        context.start();
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Beginning Test ---> testSSLInOutWithNettyConsumer()");
+        }        
+        sendRequest();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Completed Test ---> testSSLInOutWithNettyConsumer()");
+        }
+        context.stop();
+    }    
+
+}

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettySSLTest.java
------------------------------------------------------------------------------
    svn:eol-style = native