You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/31 09:39:53 UTC

camel git commit: CAMEL-10003: camel-netty4 - add support for using native transport.

Repository: camel
Updated Branches:
  refs/heads/master 109b8a436 -> e8e6cc4a6


CAMEL-10003: camel-netty4 - add support for using native transport.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8e6cc4a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8e6cc4a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8e6cc4a

Branch: refs/heads/master
Commit: e8e6cc4a6da2c19050508b0c02d425519b4ea714
Parents: 109b8a4
Author: Claus Ibsen <da...@apache.org>
Authored: Tue May 31 11:39:45 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 31 11:39:45 2016 +0200

----------------------------------------------------------------------
 components/camel-netty4/src/main/docs/netty4.adoc |  7 ++++++-
 .../ClientModeTCPNettyServerBootstrapFactory.java |  8 +++++++-
 .../camel/component/netty4/NettyProducer.java     | 18 +++++++++++++++---
 .../netty4/NettyServerBootstrapConfiguration.java | 16 +++++++++++++++-
 .../netty4/NettyServerBossPoolBuilder.java        | 17 ++++++++++++++++-
 .../component/netty4/NettyWorkerPoolBuilder.java  | 17 ++++++++++++++++-
 .../SingleTCPNettyServerBootstrapFactory.java     |  9 ++++++++-
 .../SingleUDPNettyServerBootstrapFactory.java     |  8 +++++++-
 8 files changed, 90 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/docs/netty4.adoc
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/docs/netty4.adoc b/components/camel-netty4/src/main/docs/netty4.adoc
index 2a01619..0c8d518 100644
--- a/components/camel-netty4/src/main/docs/netty4.adoc
+++ b/components/camel-netty4/src/main/docs/netty4.adoc
@@ -77,8 +77,10 @@ The Netty4 component supports 3 options which are listed below.
 
 
 
+
+
 // endpoint options: START
-The Netty4 component supports 72 endpoint options which are listed below:
+The Netty4 component supports 73 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -126,6 +128,7 @@ The Netty4 component supports 72 endpoint options which are listed below:
 | useByteBuf | producer (advanced) | false | boolean | If the useByteBuf is true netty producer will turn the message body into ByteBuf before sending it out.
 | bootstrapConfiguration | advanced |  | NettyServerBootstrapConfiguration | To use a custom configured NettyServerBootstrapConfiguration for configuring this endpoint.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
+| nativeTransport | advanced | false | boolean | Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms. You need to add the netty JAR for the host operating system you are using. See more details at: http://netty.io/wiki/native-transports.html
 | options | advanced |  | Map | Allows to configure additional netty options using option. as prefix. For example option.child.keepAlive=false to set the netty option child.keepAlive=false. See the Netty documentation for possible options that can be used.
 | receiveBufferSize | advanced | 65536 | int | The TCP/UDP buffer sizes to be used during inbound communication. Size is bytes.
 | receiveBufferSizePredictor | advanced |  | int | Configures the buffer size predictor. See details at Jetty documentation and this mail thread.
@@ -163,6 +166,8 @@ The Netty4 component supports 72 endpoint options which are listed below:
 
 
 
+
+
 [[Netty4-RegistrybasedOptions]]
 Registry based Options
 ^^^^^^^^^^^^^^^^^^^^^^

http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java
index fc9cb2c..653da19 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ClientModeTCPNettyServerBootstrapFactory.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import org.apache.camel.CamelContext;
@@ -120,6 +121,7 @@ public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport imp
         if (wg == null) {
             // create new pool which we should shutdown when stopping as its not shared
             workerGroup = new NettyWorkerPoolBuilder()
+                    .withNativeTransport(configuration.isNativeTransport())
                     .withWorkerCount(configuration.getWorkerCount())
                     .withName("NettyServerTCPWorker")
                     .build();
@@ -127,7 +129,11 @@ public class ClientModeTCPNettyServerBootstrapFactory extends ServiceSupport imp
         }
         
         clientBootstrap = new Bootstrap();
-        clientBootstrap.channel(NioSocketChannel.class);
+        if (configuration.isNativeTransport()) {
+            clientBootstrap.channel(EpollSocketChannel.class);
+        } else {
+            clientBootstrap.channel(NioSocketChannel.class);
+        }
         clientBootstrap.group(wg);
         clientBootstrap.option(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive());
         clientBootstrap.option(ChannelOption.TCP_NODELAY, configuration.isTcpNoDelay());

http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index c181ebf..a6bce2d 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -30,6 +30,8 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
@@ -93,7 +95,9 @@ public class NettyProducer extends DefaultAsyncProducer {
         super.doStart();
         if (configuration.getWorkerGroup() == null) {
             // create new pool which we should shutdown when stopping as its not shared
-            workerGroup = new NettyWorkerPoolBuilder().withWorkerCount(configuration.getWorkerCount())
+            workerGroup = new NettyWorkerPoolBuilder()
+                .withNativeTransport(configuration.isNativeTransport())
+                .withWorkerCount(configuration.getWorkerCount())
                 .withName("NettyClientTCPWorker").build();
         }
         
@@ -391,7 +395,11 @@ public class NettyProducer extends DefaultAsyncProducer {
         if (isTcp()) {
             // its okay to create a new bootstrap for each new channel
             Bootstrap clientBootstrap = new Bootstrap();
-            clientBootstrap.channel(NioSocketChannel.class);
+            if (configuration.isNativeTransport()) {
+                clientBootstrap.channel(EpollSocketChannel.class);
+            } else {
+                clientBootstrap.channel(NioSocketChannel.class);
+            }
             clientBootstrap.group(getWorkerGroup());
             clientBootstrap.option(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive());
             clientBootstrap.option(ChannelOption.TCP_NODELAY, configuration.isTcpNoDelay());
@@ -418,7 +426,11 @@ public class NettyProducer extends DefaultAsyncProducer {
         } else {
             // its okay to create a new bootstrap for each new channel
             Bootstrap connectionlessClientBootstrap = new Bootstrap();
-            connectionlessClientBootstrap.channel(NioDatagramChannel.class);
+            if (configuration.isNativeTransport()) {
+                connectionlessClientBootstrap.channel(EpollDatagramChannel.class);
+            } else {
+                connectionlessClientBootstrap.channel(NioDatagramChannel.class);
+            }
             connectionlessClientBootstrap.group(getWorkerGroup());
             connectionlessClientBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectTimeout());
             connectionlessClientBootstrap.option(ChannelOption.SO_BROADCAST, configuration.isBroadcast());

http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
index 5db1452..3972da5 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
@@ -93,6 +93,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
     protected String enabledProtocols = DEFAULT_ENABLED_PROTOCOLS;
     @UriParam(label = "security")
     protected String passphrase;
+    @UriParam(label = "advanced")
+    protected boolean nativeTransport;
     @UriParam(label = "consumer,advanced")
     protected EventLoopGroup bossGroup;
     @UriParam(label = "consumer,advanced")
@@ -465,6 +467,18 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
         this.options = options;
     }
 
+    public boolean isNativeTransport() {
+        return nativeTransport;
+    }
+
+    /**
+     * Whether to use native transport instead of NIO. Native transport takes advantage of the host operating system and is only supported on some platforms.
+     * You need to add the netty JAR for the host operating system you are using. See more details at: http://netty.io/wiki/native-transports.html
+     */
+    public void setNativeTransport(boolean nativeTransport) {
+        this.nativeTransport = nativeTransport;
+    }
+
     public EventLoopGroup getBossGroup() {
         return bossGroup;
     }
@@ -475,7 +489,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
     public void setBossGroup(EventLoopGroup bossGroup) {
         this.bossGroup = bossGroup;
     }
-    
+
     public EventLoopGroup getWorkerGroup() {
         return workerGroup;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
index 4f1611e..75d3d13 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty4;
 
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.camel.util.concurrent.CamelThreadFactory;
 
@@ -30,6 +31,7 @@ public final class NettyServerBossPoolBuilder {
     private String name = "NettyServerBoss";
     private String pattern;
     private int bossCount = 1;
+    private boolean nativeTransport;
 
     public void setName(String name) {
         this.name = name;
@@ -43,6 +45,10 @@ public final class NettyServerBossPoolBuilder {
         this.bossCount = bossCount;
     }
 
+    public void setNativeTransport(boolean nativeTransport) {
+        this.nativeTransport = nativeTransport;
+    }
+
     public NettyServerBossPoolBuilder withName(String name) {
         setName(name);
         return this;
@@ -58,10 +64,19 @@ public final class NettyServerBossPoolBuilder {
         return this;
     }
 
+    public NettyServerBossPoolBuilder withNativeTransport(boolean nativeTransport) {
+        setNativeTransport(nativeTransport);
+        return this;
+    }
+
     /**
      * Creates a new boss pool.
      */
     public EventLoopGroup build() {
-        return new NioEventLoopGroup(bossCount, new CamelThreadFactory(pattern, name, false));
+        if (nativeTransport) {
+            return new EpollEventLoopGroup(bossCount, new CamelThreadFactory(pattern, name, false));
+        } else {
+            return new NioEventLoopGroup(bossCount, new CamelThreadFactory(pattern, name, false));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
index 125bbce..e667cc6 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty4;
 
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.camel.util.concurrent.CamelThreadFactory;
 
@@ -29,6 +30,7 @@ public final class NettyWorkerPoolBuilder {
     private String name = "NettyWorker";
     private String pattern;
     private int workerCount;
+    private boolean nativeTransport;
     private volatile EventLoopGroup workerPool;
 
     public void setName(String name) {
@@ -43,6 +45,10 @@ public final class NettyWorkerPoolBuilder {
         this.workerCount = workerCount;
     }
 
+    public void setNativeTransport(boolean nativeTransport) {
+        this.nativeTransport = nativeTransport;
+    }
+
     public NettyWorkerPoolBuilder withName(String name) {
         setName(name);
         return this;
@@ -58,12 +64,21 @@ public final class NettyWorkerPoolBuilder {
         return this;
     }
 
+    public NettyWorkerPoolBuilder withNativeTransport(boolean nativeTransport) {
+        setNativeTransport(nativeTransport);
+        return this;
+    }
+
     /**
      * Creates a new worker pool.
      */
     public EventLoopGroup build() {
         int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS;
-        workerPool = new NioEventLoopGroup(count, new CamelThreadFactory(pattern, name, false));
+        if (nativeTransport) {
+            workerPool = new EpollEventLoopGroup(count, new CamelThreadFactory(pattern, name, false));
+        } else {
+            workerPool = new NioEventLoopGroup(count, new CamelThreadFactory(pattern, name, false));
+        }
         return workerPool;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
index 9184fae..887113f 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -141,6 +142,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
         if (bg == null) {
             // create new pool which we should shutdown when stopping as its not shared
             bossGroup = new NettyServerBossPoolBuilder()
+                    .withNativeTransport(configuration.isNativeTransport())
                     .withBossCount(configuration.getBossCount())
                     .withName("NettyServerTCPBoss")
                     .build();
@@ -149,6 +151,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
         if (wg == null) {
             // create new pool which we should shutdown when stopping as its not shared
             workerGroup = new NettyWorkerPoolBuilder()
+                    .withNativeTransport(configuration.isNativeTransport())
                     .withWorkerCount(configuration.getWorkerCount())
                     .withName("NettyServerTCPWorker")
                     .build();
@@ -156,7 +159,11 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
         }
         
         serverBootstrap = new ServerBootstrap();
-        serverBootstrap.group(bg, wg).channel(NioServerSocketChannel.class);
+        if (configuration.isNativeTransport()) {
+            serverBootstrap.group(bg, wg).channel(EpollServerSocketChannel.class);
+        } else {
+            serverBootstrap.group(bg, wg).channel(NioServerSocketChannel.class);
+        }
         serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive());
         serverBootstrap.childOption(ChannelOption.TCP_NODELAY, configuration.isTcpNoDelay());
         serverBootstrap.option(ChannelOption.SO_REUSEADDR, configuration.isReuseAddress());

http://git-wip-us.apache.org/repos/asf/camel/blob/e8e6cc4a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
index dcaa262..3f68bd6 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
@@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.epoll.EpollDatagramChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.DatagramChannel;
@@ -121,6 +122,7 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         if (wg == null) {
             // create new pool which we should shutdown when stopping as its not shared
             workerGroup = new NettyWorkerPoolBuilder()
+                    .withNativeTransport(configuration.isNativeTransport())
                     .withWorkerCount(configuration.getWorkerCount())
                     .withName("NettyServerTCPWorker")
                     .build();
@@ -128,7 +130,11 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
         }
         
         Bootstrap bootstrap = new Bootstrap();
-        bootstrap.group(wg).channel(NioDatagramChannel.class);
+        if (configuration.isNativeTransport()) {
+            bootstrap.group(wg).channel(EpollDatagramChannel.class);
+        } else {
+            bootstrap.group(wg).channel(NioDatagramChannel.class);
+        }
         // We cannot set the child option here      
         bootstrap.option(ChannelOption.SO_REUSEADDR, configuration.isReuseAddress());
         bootstrap.option(ChannelOption.SO_SNDBUF, configuration.getSendBufferSize());