You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/05 09:32:19 UTC
[pulsar] branch master updated: issue #3276 Pulsar IO connector for
udp source (#3295)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1d22571 issue #3276 Pulsar IO connector for udp source (#3295)
1d22571 is described below
commit 1d2257110ed30012096ec5ef783805d0f1f09a78
Author: wpl <12...@qq.com>
AuthorDate: Sat Jan 5 17:32:14 2019 +0800
issue #3276 Pulsar IO connector for udp source (#3295)
### Motivation
Fixes #3276
### Result
Pulsar IO connector for udp source
---
.../org/apache/pulsar/io/netty/NettySource.java | 83 ++++++++++
...TcpSourceConfig.java => NettySourceConfig.java} | 18 ++-
.../org/apache/pulsar/io/netty/NettyTcpSource.java | 77 ----------
.../{tcp => }/server/NettyChannelInitializer.java | 2 +-
.../apache/pulsar/io/netty/server/NettyServer.java | 169 +++++++++++++++++++++
.../NettyServerHandler.java} | 18 +--
.../pulsar/io/netty/tcp/server/NettyTcpServer.java | 123 ---------------
.../resources/META-INF/services/pulsar-io.yaml | 6 +-
...eConfigTest.java => NettySourceConfigTest.java} | 36 +++--
.../server/NettyChannelInitializerTest.java | 6 +-
.../NettyServerTest.java} | 96 +++++++++---
...TcpSourceConfig.yaml => nettySourceConfig.yaml} | 3 +-
...l => nettySourceConfigWithInvalidProperty.yaml} | 3 +-
site2/docs/io-connectors.md | 2 +-
site2/docs/{io-tcp.md => io-netty.md} | 9 +-
15 files changed, 383 insertions(+), 268 deletions(-)
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
new file mode 100644
index 0000000..215bd34
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.netty.server.NettyServer;
+import java.util.Map;
+
+/**
+ * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic
+ */
+@Connector(
+ name = "netty",
+ type = IOType.SOURCE,
+ help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic",
+ configClass = NettySourceConfig.class)
+public class NettySource extends PushSource<byte[]> {
+
+ private NettyServer nettyServer;
+ private Thread thread;
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+ NettySourceConfig nettySourceConfig = NettySourceConfig.load(config);
+ if (nettySourceConfig.getType() == null
+ || nettySourceConfig.getHost() == null
+ || nettySourceConfig.getPort() <= 0) {
+ throw new IllegalArgumentException("Required property not set.");
+ }
+
+ thread = new Thread(new PulsarServerRunnable(nettySourceConfig, this));
+ thread.start();
+ }
+
+ @Override
+ public void close() throws Exception {
+ nettyServer.shutdownGracefully();
+ }
+
+ private class PulsarServerRunnable implements Runnable {
+
+ private NettySourceConfig nettySourceConfig;
+ private NettySource nettySource;
+
+ public PulsarServerRunnable(NettySourceConfig nettySourceConfig, NettySource nettySource) {
+ this.nettySourceConfig = nettySourceConfig;
+ this.nettySource = nettySource;
+ }
+
+ @Override
+ public void run() {
+ nettyServer = new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase()))
+ .setHost(nettySourceConfig.getHost())
+ .setPort(nettySourceConfig.getPort())
+ .setNumberOfThreads(nettySourceConfig.getNumberOfThreads())
+ .setNettySource(nettySource)
+ .build();
+
+ nettyServer.run();
+ }
+ }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
similarity index 79%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
index 07b3cf8..f5d40e9 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
@@ -30,7 +30,7 @@ import java.io.Serializable;
import java.util.Map;
/**
- * Netty Tcp Source Connector Config.
+ * Netty Tcp or Udp Source Connector Config.
*/
@Data
@Setter
@@ -38,12 +38,18 @@ import java.util.Map;
@EqualsAndHashCode
@ToString
@Accessors(chain = true)
-public class NettyTcpSourceConfig implements Serializable {
+public class NettySourceConfig implements Serializable {
private static final long serialVersionUID = -7116130435021510496L;
@FieldDoc(
required = true,
+ defaultValue = "tcp",
+ help = "The tcp or udp network protocols")
+ private String type = "tcp";
+
+ @FieldDoc(
+ required = true,
defaultValue = "127.0.0.1",
help = "The host name or address that the source instance to listen on")
private String host = "127.0.0.1";
@@ -61,14 +67,14 @@ public class NettyTcpSourceConfig implements Serializable {
"handle the traffic of the accepted connections")
private int numberOfThreads = 1;
- public static NettyTcpSourceConfig load(Map<String, Object> map) throws IOException {
+ public static NettySourceConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettyTcpSourceConfig.class);
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettySourceConfig.class);
}
- public static NettyTcpSourceConfig load(String yamlFile) throws IOException {
+ public static NettySourceConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile), NettyTcpSourceConfig.class);
+ return mapper.readValue(new File(yamlFile), NettySourceConfig.class);
}
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
deleted file mode 100644
index 3cef730..0000000
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.netty;
-
-import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.apache.pulsar.io.netty.tcp.server.NettyTcpServer;
-import java.util.Map;
-
-/**
- * A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic
- */
-@Connector(
- name = "tcp",
- type = IOType.SOURCE,
- help = "A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic",
- configClass = NettyTcpSourceConfig.class)
-public class NettyTcpSource extends PushSource<byte[]> {
-
- private NettyTcpServer nettyTcpServer;
- private Thread thread;
-
- @Override
- public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
- NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(config);
-
- thread = new Thread(new PulsarTcpServerRunnable(nettyTcpSourceConfig, this));
- thread.start();
- }
-
- @Override
- public void close() throws Exception {
- nettyTcpServer.shutdownGracefully();
- }
-
- private class PulsarTcpServerRunnable implements Runnable {
-
- private NettyTcpSourceConfig nettyTcpSourceConfig;
- private NettyTcpSource nettyTcpSource;
-
- public PulsarTcpServerRunnable(NettyTcpSourceConfig nettyTcpSourceConfig, NettyTcpSource nettyTcpSource) {
- this.nettyTcpSourceConfig = nettyTcpSourceConfig;
- this.nettyTcpSource = nettyTcpSource;
- }
-
- @Override
- public void run() {
- nettyTcpServer = new NettyTcpServer.Builder()
- .setHost(nettyTcpSourceConfig.getHost())
- .setPort(nettyTcpSourceConfig.getPort())
- .setNumberOfThreads(nettyTcpSourceConfig.getNumberOfThreads())
- .setNettyTcpSource(nettyTcpSource)
- .build();
-
- nettyTcpServer.run();
- }
- }
-
-}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
similarity index 97%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
index f88f514..b5fa820 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
new file mode 100644
index 0000000..2cb8031
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettySource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty Tcp or Udp Server to accept any incoming data through Tcp.
+ */
+public class NettyServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
+
+ private Type type;
+ private String host;
+ private int port;
+ private NettySource nettySource;
+ private int numberOfThreads;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+
+ private NettyServer(Builder builder) {
+ this.type = builder.type;
+ this.host = builder.host;
+ this.port = builder.port;
+ this.nettySource = builder.nettySource;
+ this.numberOfThreads = builder.numberOfThreads;
+ }
+
+ public void run() {
+ try {
+ switch (type) {
+ case TCP:
+ runTcp();
+ break;
+ case UDP:
+ runUdp();
+ break;
+ default:
+ runTcp();
+ break;
+ }
+ } catch(Exception ex) {
+ logger.error("Error occurred when Netty Tcp or Udp Server is running", ex);
+ } finally {
+ shutdownGracefully();
+ }
+ }
+
+ public void shutdownGracefully() {
+ if (workerGroup != null)
+ workerGroup.shutdownGracefully();
+ if (bossGroup != null)
+ bossGroup.shutdownGracefully();
+ }
+
+ private void runUdp() throws InterruptedException {
+ workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup);
+ bootstrap.channel(NioDatagramChannel.class);
+ bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+ .option(ChannelOption.SO_BACKLOG, 1024);
+
+ ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync();
+ channelFuture.channel().closeFuture().sync();
+ }
+
+ private void runTcp() throws InterruptedException {
+ bossGroup = new NioEventLoopGroup(this.numberOfThreads);
+ workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(bossGroup, workerGroup);
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ serverBootstrap.childHandler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
+ channelFuture.channel().closeFuture().sync();
+ }
+
+ /**
+ * Pulsar Netty Server Builder.
+ */
+ public static class Builder {
+
+ private Type type;
+ private String host;
+ private int port;
+ private NettySource nettySource;
+ private int numberOfThreads;
+
+ public Builder setType(Type type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder setNettySource(NettySource nettySource) {
+ this.nettySource = nettySource;
+ return this;
+ }
+
+ public Builder setNumberOfThreads(int numberOfThreads) {
+ this.numberOfThreads = numberOfThreads;
+ return this;
+ }
+
+ public NettyServer build() {
+ Preconditions.checkNotNull(this.type, "type cannot be blank/null");
+ Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null");
+ Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024");
+ Preconditions.checkNotNull(this.nettySource, "nettySource must be set");
+ Preconditions.checkArgument(this.numberOfThreads > 0,
+ "numberOfThreads must be set as positive");
+
+ return new NettyServer(this);
+ }
+ }
+
+ /**
+ * tcp or udp network protocol
+ */
+ public enum Type {
+
+ TCP,
+
+ UDP
+ }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
similarity index 74%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
index 2fd2cf3..81fd203 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
import io.netty.channel.*;
import lombok.Data;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,19 +32,19 @@ import java.util.Optional;
* Handles a server-side channel
*/
@ChannelHandler.Sharable
-public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
- private static final Logger logger = LoggerFactory.getLogger(NettyTcpServerHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
- private NettyTcpSource nettyTcpSource;
+ private NettySource nettySource;
- public NettyTcpServerHandler(NettyTcpSource nettyTcpSource) {
- this.nettyTcpSource = nettyTcpSource;
+ public NettyServerHandler(NettySource nettySource) {
+ this.nettySource = nettySource;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
- nettyTcpSource.consume(new NettyTcpRecord(Optional.ofNullable(""), bytes));
+ nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
}
@Override
@@ -54,7 +54,7 @@ public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
}
@Data
- static private class NettyTcpRecord implements Record<byte[]>, Serializable {
+ static private class NettyRecord implements Record<byte[]>, Serializable {
private final Optional<String> key;
private final byte[] value;
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
deleted file mode 100644
index b471b03..0000000
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.netty.tcp.server;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.netty.NettyTcpSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Netty Tcp Server to accept any incoming data through Tcp.
- */
-public class NettyTcpServer {
-
- private static final Logger logger = LoggerFactory.getLogger(NettyTcpServer.class);
-
- private String host;
- private int port;
- private NettyTcpSource nettyTcpSource;
- private int numberOfThreads;
- private EventLoopGroup bossGroup;
- private EventLoopGroup workerGroup;
-
- private NettyTcpServer(Builder builder) {
- this.host = builder.host;
- this.port = builder.port;
- this.nettyTcpSource = builder.nettyTcpSource;
- this.numberOfThreads = builder.numberOfThreads;
- }
-
- public void run() {
- try {
- bossGroup = new NioEventLoopGroup(this.numberOfThreads);
- workerGroup = new NioEventLoopGroup(this.numberOfThreads);
-
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new NettyChannelInitializer(new NettyTcpServerHandler(this.nettyTcpSource)))
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
-
- ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
- channelFuture.channel().closeFuture().sync();
- } catch(Exception ex) {
- logger.error("Error occurred when Netty Tcp Server is running", ex);
- } finally {
- shutdownGracefully();
- }
- }
-
- public void shutdownGracefully() {
- if (workerGroup != null)
- workerGroup.shutdownGracefully();
- if (bossGroup != null)
- bossGroup.shutdownGracefully();
- }
-
- /**
- * Pulsar Tcp Server Builder.
- */
- public static class Builder {
-
- private String host;
- private int port;
- private NettyTcpSource nettyTcpSource;
- private int numberOfThreads;
-
- public Builder setHost(String host) {
- this.host = host;
- return this;
- }
-
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public Builder setNettyTcpSource(NettyTcpSource nettyTcpSource) {
- this.nettyTcpSource = nettyTcpSource;
- return this;
- }
-
- public Builder setNumberOfThreads(int numberOfThreads) {
- this.numberOfThreads = numberOfThreads;
- return this;
- }
-
- public NettyTcpServer build() {
- Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null");
- Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024");
- Preconditions.checkNotNull(this.nettyTcpSource, "nettyTcpSource must be set");
- Preconditions.checkArgument(this.numberOfThreads > 0,
- "numberOfThreads must be set as positive");
-
- return new NettyTcpServer(this);
- }
- }
-
-}
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
index 58744ce..22c4af4 100644
--- a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
# under the License.
#
-name: tcp
-description: Netty Tcp Source Connector
-sourceClass: org.apache.pulsar.io.netty.NettyTcpSource
+name: netty
+description: Netty Tcp or Udp Source Connector
+sourceClass: org.apache.pulsar.io.netty.NettySource
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
similarity index 65%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
index efc763a..034fa79 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
@@ -30,24 +30,27 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
- * Tests for Netty Tcp Source Config
+ * Tests for Netty Tcp or Udp Source Config
*/
-public class NettyTcpSourceConfigTest {
+public class NettySourceConfigTest {
private static final String LOCALHOST = "127.0.0.1";
+ private static final String TCP = "tcp";
@Test
public void testNettyTcpConfigLoadWithMap() throws IOException {
Map<String, Object> map = new HashMap<>();
+ map.put("type", TCP);
map.put("host", LOCALHOST);
map.put("port", 10999);
map.put("numberOfThreads", 1);
- NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(map);
- assertNotNull(nettyTcpSourceConfig);
- assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
- assertEquals(10999, nettyTcpSourceConfig.getPort());
- assertEquals(1, nettyTcpSourceConfig.getNumberOfThreads());
+ NettySourceConfig nettySourceConfig = NettySourceConfig.load(map);
+ assertNotNull(nettySourceConfig);
+ assertEquals(TCP, nettySourceConfig.getType());
+ assertEquals(LOCALHOST, nettySourceConfig.getHost());
+ assertEquals(10999, nettySourceConfig.getPort());
+ assertEquals(1, nettySourceConfig.getNumberOfThreads());
}
@Test(expected = UnrecognizedPropertyException.class)
@@ -55,23 +58,24 @@ public class NettyTcpSourceConfigTest {
Map<String, Object> map = new HashMap<>();
map.put("invalidProperty", 1);
- NettyTcpSourceConfig.load(map);
+ NettySourceConfig.load(map);
}
@Test
public void testNettyTcpConfigLoadWithYamlFile() throws IOException {
- File yamlFile = getFile("nettyTcpSourceConfig.yaml");
- NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
- assertNotNull(nettyTcpSourceConfig);
- assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
- assertEquals(10911, nettyTcpSourceConfig.getPort());
- assertEquals(5, nettyTcpSourceConfig.getNumberOfThreads());
+ File yamlFile = getFile("nettySourceConfig.yaml");
+ NettySourceConfig nettySourceConfig = NettySourceConfig.load(yamlFile.getAbsolutePath());
+ assertNotNull(nettySourceConfig);
+ assertEquals(TCP, nettySourceConfig.getType());
+ assertEquals(LOCALHOST, nettySourceConfig.getHost());
+ assertEquals(10911, nettySourceConfig.getPort());
+ assertEquals(5, nettySourceConfig.getNumberOfThreads());
}
@Test(expected = UnrecognizedPropertyException.class)
public void testNettyTcpConfigLoadWithYamlFileWhenInvalidPropertyIsSet() throws IOException {
- File yamlFile = getFile("nettyTcpSourceConfigWithInvalidProperty.yaml");
- NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+ File yamlFile = getFile("nettySourceConfigWithInvalidProperty.yaml");
+ NettySourceConfig.load(yamlFile.getAbsolutePath());
}
private File getFile(String name) {
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
similarity index 89%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
index a5243e6..b8d6dd4 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -35,7 +35,7 @@ public class NettyChannelInitializerTest {
NioSocketChannel channel = new NioSocketChannel();
NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
- new NettyTcpServerHandler(new NettyTcpSource()));
+ new NettyServerHandler(new NettySource()));
nettyChannelInitializer.initChannel(channel);
assertNotNull(channel.pipeline().toMap());
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
similarity index 51%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
index 0c2f56b..8041570 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
@@ -16,27 +16,71 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.NettySourceConfig;
import org.junit.Test;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
- * Tests for Netty Tcp Server
+ * Tests for Netty Tcp or Udp Server
*/
-public class NettyTcpServerTest {
+public class NettyServerTest {
private static final String LOCALHOST = "127.0.0.1";
+ private static final String TCP = "TCP";
+ private static final String UDP = "UDP";
@Test
public void testNettyTcpServerConstructor() {
- NettyTcpServer nettyTcpServer = new NettyTcpServer.Builder()
+ NettyServer nettyTcpServer = new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(LOCALHOST)
.setPort(10999)
.setNumberOfThreads(2)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
+ .build();
+
+ assertNotNull(nettyTcpServer);
+ }
+
+ @Test
+ public void testNettyUdpServerConstructor() {
+ NettyServer nettyUdpServer = new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(UDP))
+ .setHost(LOCALHOST)
+ .setPort(10999)
+ .setNumberOfThreads(2)
+ .setNettySource(new NettySource())
+ .build();
+
+ assertNotNull(nettyUdpServer);
+ }
+
+ @Test
+ public void testNettyTcpServerByNettySourceConfig() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("type", "tcp");
+ map.put("host", LOCALHOST);
+ map.put("port", 10999);
+ map.put("numberOfThreads", 1);
+
+ NettySourceConfig nettySourceConfig = NettySourceConfig.load(map);
+
+ // test NettySource run function NettyServer Builder
+ NettyServer nettyTcpServer = new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase()))
+ .setHost(nettySourceConfig.getHost())
+ .setPort(nettySourceConfig.getPort())
+ .setNumberOfThreads(nettySourceConfig.getNumberOfThreads())
+ .setNettySource(new NettySource())
.build();
assertNotNull(nettyTcpServer);
@@ -44,36 +88,38 @@ public class NettyTcpServerTest {
@Test(expected = IllegalArgumentException.class)
public void testNettyTcpServerConstructorWhenHostIsNotSet() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setPort(10999)
.setNumberOfThreads(2)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testNettyTcpServerConstructorWhenPortIsNotSet() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(LOCALHOST)
.setNumberOfThreads(2)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
.build();
}
-
@Test(expected = IllegalArgumentException.class)
public void testNettyTcpServerConstructorWhenNumberOfThreadsIsNotSet() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(LOCALHOST)
.setPort(10999)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
.build();
}
-
@Test(expected = NullPointerException.class)
public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(LOCALHOST)
.setPort(10999)
.setNumberOfThreads(2)
@@ -82,41 +128,45 @@ public class NettyTcpServerTest {
@Test(expected = IllegalArgumentException.class)
public void testNettyTcpServerWhenHostIsSetAsBlank() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(" ")
.setPort(10999)
.setNumberOfThreads(2)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testNettyTcpServerWhenPortIsSetAsZero() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(LOCALHOST)
.setPort(0)
.setNumberOfThreads(2)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testNettyTcpServerWhenPortIsSetLowerThan1024() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(LOCALHOST)
.setPort(1022)
.setNumberOfThreads(2)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
.build();
}
@Test(expected = IllegalArgumentException.class)
public void testNettyTcpServerWhenNumberOfThreadsIsSetAsZero() {
- new NettyTcpServer.Builder()
+ new NettyServer.Builder()
+ .setType(NettyServer.Type.valueOf(TCP))
.setHost(LOCALHOST)
.setPort(10999)
.setNumberOfThreads(0)
- .setNettyTcpSource(new NettyTcpSource())
+ .setNettySource(new NettySource())
.build();
}
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
similarity index 98%
rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
rename to pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
index ca748cc..eff1a9c 100644
--- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
+++ b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
@@ -18,7 +18,8 @@
#
{
+"type": "tcp",
"host": "127.0.0.1",
"port": "10911",
"numberOfThreads": "5"
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
similarity index 98%
rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
rename to pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
index 8a2bb92..b1f90c2 100644
--- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
+++ b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
@@ -18,7 +18,8 @@
#
{
+"type": "tcp",
"host": "127.0.0.1",
"port": "10911",
"invalidProperty": "5"
-}
\ No newline at end of file
+}
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 0250ebc..05a43dc 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,4 +17,4 @@ Pulsar Functions cluster.
- [RabbitMQ Source Connector](io-rabbitmq.md#source)
- [Twitter Firehose Source Connector](io-twitter.md)
- [CDC Source Connector based on Debezium](io-cdc.md)
-- [Netty Tcp Source Connector](io-tcp.md#source)
+- [Netty Tcp or Udp Source Connector](io-netty.md#source)
diff --git a/site2/docs/io-tcp.md b/site2/docs/io-netty.md
similarity index 69%
rename from site2/docs/io-tcp.md
rename to site2/docs/io-netty.md
index 8bf3a89..479752f 100644
--- a/site2/docs/io-tcp.md
+++ b/site2/docs/io-netty.md
@@ -1,12 +1,12 @@
---
-id: io-tcp
-title: Netty Tcp Connector
-sidebar_label: Netty Tcp Connector
+id: io-netty
+title: Netty Tcp or Udp Connector
+sidebar_label: Netty Tcp or Udp Connector
---
## Source
-The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client and write them to user-defined Pulsar topic.
+The Netty Tcp or Udp Source connector is used to listen Tcp/Udp messages from Tcp/Udp Client and write them to user-defined Pulsar topic.
Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment.
Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports.
@@ -14,6 +14,7 @@ Otherwise, if the connector is running in process or thread mode, the instances
| Name | Required | Default | Description |
|------|----------|---------|-------------|
+| `type` | `false` | `tcp` | The tcp or udp network protocol required by netty. |
| `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. |
| `port` | `false` | `10999` | The port that the source instance to listen on. |
| `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. |