You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/04/15 10:31:25 UTC
[camel] branch camel-3.x updated: CAMEL-19270 - camel-netty - add support for unix domain sockets (#9869)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new 402aa7ab8e2 CAMEL-19270 - camel-netty - add support for unix domain sockets (#9869)
402aa7ab8e2 is described below
commit 402aa7ab8e22e2f8fa8c9c52b28898642297dda5
Author: Ivan Mashtak <de...@gmail.com>
AuthorDate: Sat Apr 15 13:30:39 2023 +0300
CAMEL-19270 - camel-netty - add support for unix domain sockets (#9869)
---
.../component/netty/NettyComponentConfigurer.java | 6 +++
.../netty/NettyConfigurationConfigurer.java | 6 +++
.../component/netty/NettyEndpointConfigurer.java | 6 +++
.../component/netty/NettyEndpointUriFactory.java | 3 +-
.../org/apache/camel/component/netty/netty.json | 2 +
.../camel/component/netty/NettyProducer.java | 28 ++++++++---
.../netty/NettyServerBootstrapConfiguration.java | 20 ++++++++
.../SingleTCPNettyServerBootstrapFactory.java | 35 ++++++++++---
.../camel/component/netty/NettyTCPSyncUDSTest.java | 58 ++++++++++++++++++++++
9 files changed, 150 insertions(+), 14 deletions(-)
diff --git a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyComponentConfigurer.java b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyComponentConfigurer.java
index 65274daa322..73bfbf1c7f6 100644
--- a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyComponentConfigurer.java
+++ b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyComponentConfigurer.java
@@ -154,6 +154,8 @@ public class NettyComponentConfigurer extends PropertyConfigurerSupport implemen
case "udpByteArrayCodec": getOrCreateConfiguration(target).setUdpByteArrayCodec(property(camelContext, boolean.class, value)); return true;
case "udpconnectionlesssending":
case "udpConnectionlessSending": getOrCreateConfiguration(target).setUdpConnectionlessSending(property(camelContext, boolean.class, value)); return true;
+ case "unixdomainsocketpath":
+ case "unixDomainSocketPath": getOrCreateConfiguration(target).setUnixDomainSocketPath(property(camelContext, java.lang.String.class, value)); return true;
case "usebytebuf":
case "useByteBuf": getOrCreateConfiguration(target).setUseByteBuf(property(camelContext, boolean.class, value)); return true;
case "useglobalsslcontextparameters":
@@ -297,6 +299,8 @@ public class NettyComponentConfigurer extends PropertyConfigurerSupport implemen
case "udpByteArrayCodec": return boolean.class;
case "udpconnectionlesssending":
case "udpConnectionlessSending": return boolean.class;
+ case "unixdomainsocketpath":
+ case "unixDomainSocketPath": return java.lang.String.class;
case "usebytebuf":
case "useByteBuf": return boolean.class;
case "useglobalsslcontextparameters":
@@ -441,6 +445,8 @@ public class NettyComponentConfigurer extends PropertyConfigurerSupport implemen
case "udpByteArrayCodec": return getOrCreateConfiguration(target).isUdpByteArrayCodec();
case "udpconnectionlesssending":
case "udpConnectionlessSending": return getOrCreateConfiguration(target).isUdpConnectionlessSending();
+ case "unixdomainsocketpath":
+ case "unixDomainSocketPath": return getOrCreateConfiguration(target).getUnixDomainSocketPath();
case "usebytebuf":
case "useByteBuf": return getOrCreateConfiguration(target).isUseByteBuf();
case "useglobalsslcontextparameters":
diff --git a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyConfigurationConfigurer.java b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyConfigurationConfigurer.java
index 9f6e1b59847..7e0829b30ab 100644
--- a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyConfigurationConfigurer.java
+++ b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyConfigurationConfigurer.java
@@ -163,6 +163,8 @@ public class NettyConfigurationConfigurer extends org.apache.camel.support.compo
case "UdpByteArrayCodec": target.setUdpByteArrayCodec(property(camelContext, boolean.class, value)); return true;
case "udpconnectionlesssending":
case "UdpConnectionlessSending": target.setUdpConnectionlessSending(property(camelContext, boolean.class, value)); return true;
+ case "unixdomainsocketpath":
+ case "UnixDomainSocketPath": target.setUnixDomainSocketPath(property(camelContext, java.lang.String.class, value)); return true;
case "usebytebuf":
case "UseByteBuf": target.setUseByteBuf(property(camelContext, boolean.class, value)); return true;
case "usingexecutorservice":
@@ -320,6 +322,8 @@ public class NettyConfigurationConfigurer extends org.apache.camel.support.compo
case "UdpByteArrayCodec": return boolean.class;
case "udpconnectionlesssending":
case "UdpConnectionlessSending": return boolean.class;
+ case "unixdomainsocketpath":
+ case "UnixDomainSocketPath": return java.lang.String.class;
case "usebytebuf":
case "UseByteBuf": return boolean.class;
case "usingexecutorservice":
@@ -478,6 +482,8 @@ public class NettyConfigurationConfigurer extends org.apache.camel.support.compo
case "UdpByteArrayCodec": return target.isUdpByteArrayCodec();
case "udpconnectionlesssending":
case "UdpConnectionlessSending": return target.isUdpConnectionlessSending();
+ case "unixdomainsocketpath":
+ case "UnixDomainSocketPath": return target.getUnixDomainSocketPath();
case "usebytebuf":
case "UseByteBuf": return target.isUseByteBuf();
case "usingexecutorservice":
diff --git a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointConfigurer.java b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointConfigurer.java
index d2e3bd826c1..f7d852ba161 100644
--- a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointConfigurer.java
+++ b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointConfigurer.java
@@ -145,6 +145,8 @@ public class NettyEndpointConfigurer extends PropertyConfigurerSupport implement
case "udpByteArrayCodec": target.getConfiguration().setUdpByteArrayCodec(property(camelContext, boolean.class, value)); return true;
case "udpconnectionlesssending":
case "udpConnectionlessSending": target.getConfiguration().setUdpConnectionlessSending(property(camelContext, boolean.class, value)); return true;
+ case "unixdomainsocketpath":
+ case "unixDomainSocketPath": target.getConfiguration().setUnixDomainSocketPath(property(camelContext, java.lang.String.class, value)); return true;
case "usebytebuf":
case "useByteBuf": target.getConfiguration().setUseByteBuf(property(camelContext, boolean.class, value)); return true;
case "usingexecutorservice":
@@ -284,6 +286,8 @@ public class NettyEndpointConfigurer extends PropertyConfigurerSupport implement
case "udpByteArrayCodec": return boolean.class;
case "udpconnectionlesssending":
case "udpConnectionlessSending": return boolean.class;
+ case "unixdomainsocketpath":
+ case "unixDomainSocketPath": return java.lang.String.class;
case "usebytebuf":
case "useByteBuf": return boolean.class;
case "usingexecutorservice":
@@ -424,6 +428,8 @@ public class NettyEndpointConfigurer extends PropertyConfigurerSupport implement
case "udpByteArrayCodec": return target.getConfiguration().isUdpByteArrayCodec();
case "udpconnectionlesssending":
case "udpConnectionlessSending": return target.getConfiguration().isUdpConnectionlessSending();
+ case "unixdomainsocketpath":
+ case "unixDomainSocketPath": return target.getConfiguration().getUnixDomainSocketPath();
case "usebytebuf":
case "useByteBuf": return target.getConfiguration().isUseByteBuf();
case "usingexecutorservice":
diff --git a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointUriFactory.java b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointUriFactory.java
index 55c24a8e337..fc44c058d47 100644
--- a/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointUriFactory.java
+++ b/components/camel-netty/src/generated/java/org/apache/camel/component/netty/NettyEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class NettyEndpointUriFactory extends org.apache.camel.support.component.
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(76);
+ Set<String> props = new HashSet<>(77);
props.add("allowDefaultCodec");
props.add("allowSerializedHeaders");
props.add("autoAppendDelimiter");
@@ -94,6 +94,7 @@ public class NettyEndpointUriFactory extends org.apache.camel.support.component.
props.add("trustStoreResource");
props.add("udpByteArrayCodec");
props.add("udpConnectionlessSending");
+ props.add("unixDomainSocketPath");
props.add("useByteBuf");
props.add("usingExecutorService");
props.add("workerCount");
diff --git a/components/camel-netty/src/generated/resources/org/apache/camel/component/netty/netty.json b/components/camel-netty/src/generated/resources/org/apache/camel/component/netty/netty.json
index f1f479d6563..09bfacc2c1a 100644
--- a/components/camel-netty/src/generated/resources/org/apache/camel/component/netty/netty.json
+++ b/components/camel-netty/src/generated/resources/org/apache/camel/component/netty/netty.json
@@ -73,6 +73,7 @@
"sendBufferSize": { "kind": "property", "displayName": "Send Buffer Size", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 65536, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "The TCP\/UDP buffer sizes to be used during outbound communication. Size is bytes." },
"transferExchange": { "kind": "property", "displayName": "Transfer Exchange", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "Only used for TCP. You can transfer the exchange over the wire instead of just the body. The f [...]
"udpByteArrayCodec": { "kind": "property", "displayName": "Udp Byte Array Codec", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "For UDP only. If enabled the using byte array codec instead of Java serialization protocol." },
+ "unixDomainSocketPath": { "kind": "property", "displayName": "Unix Domain Socket Path", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "Path to unix domain socket to use instead of inet socket. Host and port parameters will not be used [...]
"workerCount": { "kind": "property", "displayName": "Worker Count", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "When netty works on nio mode, it uses default workerCount parameter from Netty (which is cpu_core_threads x 2). User can use this o [...]
"workerGroup": { "kind": "property", "displayName": "Worker Group", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.netty.channel.EventLoopGroup", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "To use a explicit EventLoopGroup as the boss thread pool. For example to share a thread pool with multip [...]
"allowDefaultCodec": { "kind": "property", "displayName": "Allow Default Codec", "group": "codec", "label": "codec", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "The netty component installs a default codec if both, encoder\/decoder is null and textline is fal [...]
@@ -165,6 +166,7 @@
"synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used" },
"transferExchange": { "kind": "parameter", "displayName": "Transfer Exchange", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "Only used for TCP. You can transfer the exchange over the wire instead of just the body. The [...]
"udpByteArrayCodec": { "kind": "parameter", "displayName": "Udp Byte Array Codec", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "For UDP only. If enabled the using byte array codec instead of Java serialization protocol." },
+ "unixDomainSocketPath": { "kind": "parameter", "displayName": "Unix Domain Socket Path", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "Path to unix domain socket to use instead of inet socket. Host and port parameters will not be use [...]
"workerCount": { "kind": "parameter", "displayName": "Worker Count", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "When netty works on nio mode, it uses default workerCount parameter from Netty (which is cpu_core_threads x 2). User can use this [...]
"workerGroup": { "kind": "parameter", "displayName": "Worker Group", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.netty.channel.EventLoopGroup", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "To use a explicit EventLoopGroup as the boss thread pool. For example to share a thread pool with multi [...]
"allowDefaultCodec": { "kind": "parameter", "displayName": "Allow Default Codec", "group": "codec", "label": "codec", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.netty.NettyConfiguration", "configurationField": "configuration", "description": "The netty component installs a default codec if both, encoder\/decoder is null and textline is fa [...]
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 0683f07ad17..bcd7023f434 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.netty;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
@@ -31,12 +33,14 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
@@ -450,10 +454,14 @@ public class NettyProducer extends DefaultAsyncProducer {
if (isTcp()) {
// its okay to create a new bootstrap for each new channel
Bootstrap clientBootstrap = new Bootstrap();
- if (configuration.isNativeTransport()) {
- clientBootstrap.channel(EpollSocketChannel.class);
+ if (configuration.getUnixDomainSocketPath() != null) {
+ clientBootstrap.channel(EpollDomainSocketChannel.class);
} else {
- clientBootstrap.channel(NioSocketChannel.class);
+ if (configuration.isNativeTransport()) {
+ clientBootstrap.channel(EpollSocketChannel.class);
+ } else {
+ clientBootstrap.channel(NioSocketChannel.class);
+ }
}
clientBootstrap.group(getWorkerGroup());
clientBootstrap.option(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive());
@@ -472,11 +480,19 @@ public class NettyProducer extends DefaultAsyncProducer {
// set the pipeline factory, which creates the pipeline for each newly created channels
clientBootstrap.handler(pipelineFactory);
- answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new TCP client bootstrap connecting to {}:{} with options: {}",
+ SocketAddress socketAddress;
+ if (configuration.getUnixDomainSocketPath() != null) {
+ Path udsPath = Path.of(configuration.getUnixDomainSocketPath()).toAbsolutePath();
+ LOG.debug("Creating new TCP client bootstrap connecting to {} with options {}",
+ udsPath, clientBootstrap);
+ socketAddress = new DomainSocketAddress(udsPath.toFile());
+ } else {
+ LOG.debug("Creating new TCP client bootstrap connecting to {}:{} with options: {}",
configuration.getHost(), configuration.getPort(), clientBootstrap);
+ socketAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort());
}
+ answer = clientBootstrap.connect(socketAddress);
+ LOG.debug("TCP client bootstrap created");
return answer;
} else {
// its okay to create a new bootstrap for each new channel
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
index 960aa21d244..2959931cda7 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
@@ -111,8 +111,13 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
private boolean reconnect = true;
@UriParam(label = "consumer", defaultValue = "10000")
private int reconnectInterval = 10000;
+ @UriParam(label = "advanced")
+ private String unixDomainSocketPath;
public String getAddress() {
+ if (unixDomainSocketPath != null) {
+ return unixDomainSocketPath;
+ }
return host + ":" + port;
}
@@ -565,6 +570,18 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
this.reconnectInterval = reconnectInterval;
}
+ public String getUnixDomainSocketPath() {
+ return unixDomainSocketPath;
+ }
+
+ /**
+ * Path to unix domain socket to use instead of inet socket. Host and port parameters will not be used, however
+ * required. It is ok to set dummy values for them. Must be used with nativeTransport=true and clientMode=false.
+ */
+ public void setUnixDomainSocketPath(String unixDomainSocketPath) {
+ this.unixDomainSocketPath = unixDomainSocketPath;
+ }
+
/**
* Checks if the other {@link NettyServerBootstrapConfiguration} is compatible with this, as a Netty listener bound
* on port X shares the same common {@link NettyServerBootstrapConfiguration}, which must be identical.
@@ -647,6 +664,8 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
isCompatible = false;
} else if (reconnectInterval != other.reconnectInterval) {
isCompatible = false;
+ } else if (unixDomainSocketPath != other.unixDomainSocketPath) {
+ isCompatible = false;
}
return isCompatible;
@@ -688,6 +707,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
+ ", networkInterface='" + networkInterface + '\''
+ ", reconnect='" + reconnect + '\''
+ ", reconnectInterval='" + reconnectInterval + '\''
+ + ", unixDomainSocketPath='" + unixDomainSocketPath + '\''
+ '}';
}
}
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
index 44bb693e155..f43c9cae716 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.netty;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
@@ -26,10 +28,12 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.apache.camel.CamelContext;
import org.apache.camel.support.CamelContextHelper;
@@ -143,10 +147,14 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
}
serverBootstrap = new ServerBootstrap();
- if (configuration.isNativeTransport()) {
- serverBootstrap.group(bg, wg).channel(EpollServerSocketChannel.class);
+ if (configuration.getUnixDomainSocketPath() != null) {
+ serverBootstrap.group(bg, wg).channel(EpollServerDomainSocketChannel.class);
} else {
- serverBootstrap.group(bg, wg).channel(NioServerSocketChannel.class);
+ if (configuration.isNativeTransport()) {
+ serverBootstrap.group(bg, wg).channel(EpollServerSocketChannel.class);
+ } else {
+ serverBootstrap.group(bg, wg).channel(NioServerSocketChannel.class);
+ }
}
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, configuration.isKeepAlive());
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, configuration.isTcpNoDelay());
@@ -179,9 +187,16 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
LOG.debug("Created ServerBootstrap {}", serverBootstrap);
- LOG.info("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
- ChannelFuture channelFuture
- = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort())).sync();
+ SocketAddress socketAddress;
+ if (configuration.getUnixDomainSocketPath() != null) {
+ Path udsPath = Path.of(configuration.getUnixDomainSocketPath()).toAbsolutePath();
+ LOG.info("ServerBootstrap binding to {}", udsPath);
+ socketAddress = new DomainSocketAddress(udsPath.toFile());
+ } else {
+ LOG.info("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+ socketAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+ }
+ ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();
channel = channelFuture.channel();
// to keep track of all channels in use
allChannels.add(channel);
@@ -189,7 +204,13 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
protected void stopServerBootstrap() {
// close all channels
- LOG.info("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
+ if (configuration.getUnixDomainSocketPath() != null) {
+ Path udsPath = Path.of(configuration.getUnixDomainSocketPath()).toAbsolutePath();
+ LOG.info("ServerBootstrap unbinding from {}", udsPath);
+ } else {
+ LOG.info("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
+
+ }
LOG.trace("Closing {} channels", allChannels.size());
allChannels.close().awaitUninterruptibly();
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncUDSTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncUDSTest.java
new file mode 100644
index 00000000000..e9db61eb991
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTCPSyncUDSTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Add {@code <classifier>linux-x86_64</classifier>} to io.netty:netty-transport-native-epoll dependency to make this
+ * test work
+ */
+@Disabled("Requires native library to load, can be run manually")
+public class NettyTCPSyncUDSTest extends BaseNettyTest {
+
+ @Test
+ public void test() {
+ String response = template.requestBody(
+ "netty:tcp://dummy:0?sync=true&nativeTransport=true&unixDomainSocketPath=target/test.sock",
+ "Epitaph in Kohima, India marking the WWII Battle of Kohima and Imphal, Burma Campaign - Attributed to John Maxwell Edmonds",
+ String.class);
+ assertEquals("When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.", response);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("netty:tcp://dummy:0?sync=true&nativeTransport=true&unixDomainSocketPath=target/test.sock")
+ .process(new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getMessage().setBody(
+ "When You Go Home, Tell Them Of Us And Say, For Your Tomorrow, We Gave Our Today.");
+ }
+ });
+ }
+ };
+ }
+}