You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/12/19 10:50:52 UTC

svn commit: r1423808 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Wed Dec 19 09:50:50 2012
New Revision: 1423808

URL: http://svn.apache.org/viewvc?rev=1423808&view=rev
Log:
CAMEL-5896: Allow to configure netty options using option. prefix in uri

Added:
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java
      - copied, changed from r1423766, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java
Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1423808&r1=1423807&r2=1423808&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Wed Dec 19 09:50:50 2012
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.EndpointHelper;
+import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.jsse.SSLContextParameters;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -83,6 +84,7 @@ public class NettyConfiguration implemen
     private int producerPoolMaxIdle = 100;
     private long producerPoolMinEvictableIdle = 5 * 60 * 1000L;
     private int backlog;
+    private Map<String, Object> options;
     
     /**
      * Returns a copy of this configuration
@@ -161,6 +163,12 @@ public class NettyConfiguration implemen
         EndpointHelper.setReferenceProperties(component.getCamelContext(), this, parameters);
         EndpointHelper.setProperties(component.getCamelContext(), this, parameters);
 
+        // additional netty options, we don't want to store an empty map, so set it as null if empty
+        options = IntrospectionSupport.extractProperties(parameters, "option.");
+        if (options !=  null && options.isEmpty()) {
+            options = null;
+        }
+
         // add default encoders and decoders
         if (encoders.isEmpty() && decoders.isEmpty()) {
             if (allowDefaultCodec) {
@@ -582,6 +590,17 @@ public class NettyConfiguration implemen
         this.backlog = backlog;
     }
 
+    public Map<String, Object> getOptions() {
+        return options;
+    }
+
+    /**
+     * Additional options to set on Netty.
+     */
+    public void setOptions(Map<String, Object> options) {
+        this.options = options;
+    }
+
     private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
         if (handlers != null) {
             for (int x = 0; x < handlers.size(); x++) {

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1423808&r1=1423807&r2=1423808&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java Wed Dec 19 09:50:50 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
@@ -186,10 +187,15 @@ public class NettyConsumer extends Defau
             serverBootstrap.setOption("backlog", configuration.getBacklog());
         }
 
-        if (log.isDebugEnabled()) {
-            log.debug("Created ServerBootstrap {} with options: {}", serverBootstrap, serverBootstrap.getOptions());
+        // set any additional netty options
+        if (configuration.getOptions() != null) {
+            for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                serverBootstrap.setOption(entry.getKey(), entry.getValue());
+            }
         }
 
+        log.info("Created ServerBootstrap {} with options: {}", serverBootstrap, serverBootstrap.getOptions());
+
         // set the pipeline factory, which creates the pipeline for each newly created channels
         serverBootstrap.setPipelineFactory(pipelineFactory);
 
@@ -223,10 +229,15 @@ public class NettyConsumer extends Defau
             connectionlessServerBootstrap.setOption("backlog", configuration.getBacklog());
         }
 
-        if (log.isDebugEnabled()) {
-            log.debug("Created ConnectionlessBootstrap {} with options: {}", connectionlessServerBootstrap, connectionlessServerBootstrap.getOptions());
+        // set any additional netty options
+        if (configuration.getOptions() != null) {
+            for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                connectionlessServerBootstrap.setOption(entry.getKey(), entry.getValue());
+            }
         }
 
+        log.info("Created ConnectionlessBootstrap {} with options: {}", connectionlessServerBootstrap, connectionlessServerBootstrap.getOptions());
+
         // set the pipeline factory, which creates the pipeline for each newly created channels
         connectionlessServerBootstrap.setPipelineFactory(pipelineFactory);
 

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1423808&r1=1423807&r2=1423808&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Wed Dec 19 09:50:50 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
@@ -314,10 +315,20 @@ public class NettyProducer extends Defau
             clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
             clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout());
 
+            // set any additional netty options
+            if (configuration.getOptions() != null) {
+                for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                    clientBootstrap.setOption(entry.getKey(), entry.getValue());
+                }
+            }
+
             // set the pipeline factory, which creates the pipeline for each newly created channels
             clientBootstrap.setPipelineFactory(pipelineFactory);
             answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-            LOG.trace("Created new TCP client bootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created new TCP client bootstrap connecting to {}:{} with options: {}",
+                        new Object[]{configuration.getHost(), configuration.getPort(), clientBootstrap.getOptions()});
+            }
             return answer;
         } else {
             // its okay to create a new bootstrap for each new channel
@@ -330,13 +341,24 @@ public class NettyProducer extends Defau
             connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
             connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
 
+            // set any additional netty options
+            if (configuration.getOptions() != null) {
+                for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                    connectionlessClientBootstrap.setOption(entry.getKey(), entry.getValue());
+                }
+            }
+
             // set the pipeline factory, which creates the pipeline for each newly created channels
             connectionlessClientBootstrap.setPipelineFactory(pipelineFactory);
             // bind and store channel so we can close it when stopping
             Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0));
             ALL_CHANNELS.add(channel);
             answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-            LOG.trace("Created new UDP client bootstrap connecting to {}:{}", configuration.getHost(), configuration.getPort());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created new UDP client bootstrap connecting to {}:{} with options: {}",
+                       new Object[]{configuration.getHost(), configuration.getPort(), connectionlessClientBootstrap.getOptions()});
+            }
             return answer;
         }
     }
@@ -347,7 +369,7 @@ public class NettyProducer extends Defau
         channelFuture.addListener(new ChannelFutureListener() {
             @Override
             public void operationComplete(ChannelFuture channelFuture) throws Exception {
-                LOG.debug("Operation complete {}", channelFuture);
+                LOG.trace("Operation complete {}", channelFuture);
                 latch.countDown();
             }
         });

Copied: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java (from r1423766, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java?p2=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java&p1=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java&r1=1423766&r2=1423808&rev=1423808&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyBacklogTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyOptionTest.java Wed Dec 19 09:50:50 2012
@@ -20,14 +20,14 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 
-public class NettyBacklogTest extends NettyTCPSyncTest {
+public class NettyOptionTest extends NettyTCPSyncTest {
     
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("netty:tcp://localhost:{{port}}?sync=true&backlog=500")
+                from("netty:tcp://localhost:{{port}}?sync=true&option.child.keepAlive=false")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             if (exchange.getIn().getBody() instanceof Poetry) {