You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/05 09:32:16 UTC

[GitHub] sijie closed pull request #3295: issue #3276 Pulsar IO connector for udp source

sijie closed pull request #3295:  issue #3276 Pulsar IO connector for udp source
URL: https://github.com/apache/pulsar/pull/3295
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
new file mode 100644
index 0000000000..215bd344a6
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.netty.server.NettyServer;
+import java.util.Map;
+
+/**
+ * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic
+ */
+@Connector(
+    name = "netty",
+    type = IOType.SOURCE,
+    help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic",
+    configClass = NettySourceConfig.class)
+public class NettySource extends PushSource<byte[]> {
+
+    private NettyServer nettyServer;
+    private Thread thread;
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(config);
+        if (nettySourceConfig.getType() == null
+                || nettySourceConfig.getHost() == null
+                || nettySourceConfig.getPort() <= 0) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+
+        thread = new Thread(new PulsarServerRunnable(nettySourceConfig, this));
+        thread.start();
+    }
+
+    @Override
+    public void close() throws Exception {
+        nettyServer.shutdownGracefully();
+    }
+
+    private class PulsarServerRunnable implements Runnable {
+
+        private NettySourceConfig nettySourceConfig;
+        private NettySource nettySource;
+
+        public PulsarServerRunnable(NettySourceConfig nettySourceConfig, NettySource nettySource) {
+            this.nettySourceConfig = nettySourceConfig;
+            this.nettySource = nettySource;
+        }
+
+        @Override
+        public void run() {
+            nettyServer = new NettyServer.Builder()
+                    .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase()))
+                    .setHost(nettySourceConfig.getHost())
+                    .setPort(nettySourceConfig.getPort())
+                    .setNumberOfThreads(nettySourceConfig.getNumberOfThreads())
+                    .setNettySource(nettySource)
+                    .build();
+
+            nettyServer.run();
+        }
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
similarity index 79%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
index 07b3cf8b8f..f5d40e930a 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
@@ -30,7 +30,7 @@
 import java.util.Map;
 
 /**
- * Netty Tcp Source Connector Config.
+ * Netty Tcp or Udp Source Connector Config.
  */
 @Data
 @Setter
@@ -38,10 +38,16 @@
 @EqualsAndHashCode
 @ToString
 @Accessors(chain = true)
-public class NettyTcpSourceConfig implements Serializable {
+public class NettySourceConfig implements Serializable {
 
     private static final long serialVersionUID = -7116130435021510496L;
 
+    @FieldDoc(
+            required = true,
+            defaultValue = "tcp",
+            help = "The tcp or udp network protocols")
+    private String type = "tcp";
+
     @FieldDoc(
             required = true,
             defaultValue = "127.0.0.1",
@@ -61,14 +67,14 @@
                     "handle the traffic of the accepted connections")
     private int numberOfThreads = 1;
 
-    public static NettyTcpSourceConfig load(Map<String, Object> map) throws IOException {
+    public static NettySourceConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettyTcpSourceConfig.class);
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettySourceConfig.class);
     }
 
-    public static NettyTcpSourceConfig load(String yamlFile) throws IOException {
+    public static NettySourceConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), NettyTcpSourceConfig.class);
+        return mapper.readValue(new File(yamlFile), NettySourceConfig.class);
     }
 
 }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
deleted file mode 100644
index 3cef730dc3..0000000000
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.netty;
-
-import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.apache.pulsar.io.netty.tcp.server.NettyTcpServer;
-import java.util.Map;
-
-/**
- * A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic
- */
-@Connector(
-        name = "tcp",
-        type = IOType.SOURCE,
-        help = "A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic",
-        configClass = NettyTcpSourceConfig.class)
-public class NettyTcpSource extends PushSource<byte[]> {
-
-    private NettyTcpServer nettyTcpServer;
-    private Thread thread;
-
-    @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
-        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(config);
-
-        thread = new Thread(new PulsarTcpServerRunnable(nettyTcpSourceConfig, this));
-        thread.start();
-    }
-
-    @Override
-    public void close() throws Exception {
-        nettyTcpServer.shutdownGracefully();
-    }
-
-    private class PulsarTcpServerRunnable implements Runnable {
-
-        private NettyTcpSourceConfig nettyTcpSourceConfig;
-        private NettyTcpSource nettyTcpSource;
-
-        public PulsarTcpServerRunnable(NettyTcpSourceConfig nettyTcpSourceConfig, NettyTcpSource nettyTcpSource) {
-            this.nettyTcpSourceConfig = nettyTcpSourceConfig;
-            this.nettyTcpSource = nettyTcpSource;
-        }
-
-        @Override
-        public void run() {
-            nettyTcpServer = new NettyTcpServer.Builder()
-                .setHost(nettyTcpSourceConfig.getHost())
-                .setPort(nettyTcpSourceConfig.getPort())
-                .setNumberOfThreads(nettyTcpSourceConfig.getNumberOfThreads())
-                .setNettyTcpSource(nettyTcpSource)
-                .build();
-
-            nettyTcpServer.run();
-        }
-    }
-
-}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
similarity index 97%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
index f88f514a4a..b5fa8209ff 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
new file mode 100644
index 0000000000..2cb803153a
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettySource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty Tcp or Udp Server to accept any incoming data through Tcp.
+ */
+public class NettyServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
+
+    private Type type;
+    private String host;
+    private int port;
+    private NettySource nettySource;
+    private int numberOfThreads;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    private NettyServer(Builder builder) {
+        this.type = builder.type;
+        this.host = builder.host;
+        this.port = builder.port;
+        this.nettySource = builder.nettySource;
+        this.numberOfThreads = builder.numberOfThreads;
+    }
+
+    public void run() {
+        try {
+            switch (type) {
+                case TCP:
+                    runTcp();
+                    break;
+                case UDP:
+                    runUdp();
+                    break;
+                default:
+                    runTcp();
+                    break;
+            }
+        } catch(Exception ex) {
+            logger.error("Error occurred when Netty Tcp or Udp Server is running", ex);
+        } finally {
+            shutdownGracefully();
+        }
+    }
+
+    public void shutdownGracefully() {
+        if (workerGroup != null)
+            workerGroup.shutdownGracefully();
+        if (bossGroup != null)
+            bossGroup.shutdownGracefully();
+    }
+
+    private void runUdp() throws InterruptedException {
+        workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup);
+        bootstrap.channel(NioDatagramChannel.class);
+        bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+                .option(ChannelOption.SO_BACKLOG, 1024);
+
+        ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    private void runTcp() throws InterruptedException {
+        bossGroup = new NioEventLoopGroup(this.numberOfThreads);
+        workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+        ServerBootstrap serverBootstrap = new ServerBootstrap();
+        serverBootstrap.group(bossGroup, workerGroup);
+        serverBootstrap.channel(NioServerSocketChannel.class);
+        serverBootstrap.childHandler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+                .option(ChannelOption.SO_BACKLOG, 1024)
+                .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    /**
+     * Pulsar Netty Server Builder.
+     */
+    public static class Builder {
+
+        private Type type;
+        private String host;
+        private int port;
+        private NettySource nettySource;
+        private int numberOfThreads;
+
+        public Builder setType(Type type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder setNettySource(NettySource nettySource) {
+            this.nettySource = nettySource;
+            return this;
+        }
+
+        public Builder setNumberOfThreads(int numberOfThreads) {
+            this.numberOfThreads = numberOfThreads;
+            return this;
+        }
+
+        public NettyServer build() {
+            Preconditions.checkNotNull(this.type, "type cannot be blank/null");
+            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null");
+            Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024");
+            Preconditions.checkNotNull(this.nettySource, "nettySource must be set");
+            Preconditions.checkArgument(this.numberOfThreads > 0,
+                    "numberOfThreads must be set as positive");
+
+            return new NettyServer(this);
+        }
+    }
+
+    /**
+     * tcp or udp network protocol
+     */
+    public enum Type {
+
+        TCP,
+
+        UDP
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
similarity index 74%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
index 2fd2cf3c69..81fd2037ac 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
 import io.netty.channel.*;
 import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,19 +32,19 @@
  * Handles a server-side channel
  */
 @ChannelHandler.Sharable
-public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
 
-    private static final Logger logger = LoggerFactory.getLogger(NettyTcpServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
 
-    private NettyTcpSource nettyTcpSource;
+    private NettySource nettySource;
 
-    public NettyTcpServerHandler(NettyTcpSource nettyTcpSource) {
-        this.nettyTcpSource = nettyTcpSource;
+    public NettyServerHandler(NettySource nettySource) {
+        this.nettySource = nettySource;
     }
 
     @Override
     protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
-        nettyTcpSource.consume(new NettyTcpRecord(Optional.ofNullable(""), bytes));
+        nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
     }
 
     @Override
@@ -54,7 +54,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
     }
 
     @Data
-    static private class NettyTcpRecord implements Record<byte[]>, Serializable {
+    static private class NettyRecord implements Record<byte[]>, Serializable {
         private final Optional<String> key;
         private final byte[] value;
     }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
deleted file mode 100644
index b471b03697..0000000000
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.netty.tcp.server;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.netty.NettyTcpSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Netty Tcp Server to accept any incoming data through Tcp.
- */
-public class NettyTcpServer {
-
-    private static final Logger logger = LoggerFactory.getLogger(NettyTcpServer.class);
-
-    private String host;
-    private int port;
-    private NettyTcpSource nettyTcpSource;
-    private int numberOfThreads;
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-
-    private NettyTcpServer(Builder builder) {
-        this.host = builder.host;
-        this.port = builder.port;
-        this.nettyTcpSource = builder.nettyTcpSource;
-        this.numberOfThreads = builder.numberOfThreads;
-    }
-
-    public void run() {
-        try {
-            bossGroup = new NioEventLoopGroup(this.numberOfThreads);
-            workerGroup = new NioEventLoopGroup(this.numberOfThreads);
-
-            ServerBootstrap serverBootstrap = new ServerBootstrap();
-            serverBootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new NettyChannelInitializer(new NettyTcpServerHandler(this.nettyTcpSource)))
-                    .option(ChannelOption.SO_BACKLOG, 1024)
-                    .childOption(ChannelOption.SO_KEEPALIVE, true);
-
-            ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
-            channelFuture.channel().closeFuture().sync();
-        } catch(Exception ex) {
-            logger.error("Error occurred when Netty Tcp Server is running", ex);
-        } finally {
-            shutdownGracefully();
-        }
-    }
-
-    public void shutdownGracefully() {
-        if (workerGroup != null)
-            workerGroup.shutdownGracefully();
-        if (bossGroup != null)
-            bossGroup.shutdownGracefully();
-    }
-
-    /**
-     * Pulsar Tcp Server Builder.
-     */
-    public static class Builder {
-
-        private String host;
-        private int port;
-        private NettyTcpSource nettyTcpSource;
-        private int numberOfThreads;
-
-        public Builder setHost(String host) {
-            this.host = host;
-            return this;
-        }
-
-        public Builder setPort(int port) {
-            this.port = port;
-            return this;
-        }
-
-        public Builder setNettyTcpSource(NettyTcpSource nettyTcpSource) {
-            this.nettyTcpSource = nettyTcpSource;
-            return this;
-        }
-
-        public Builder setNumberOfThreads(int numberOfThreads) {
-            this.numberOfThreads = numberOfThreads;
-            return this;
-        }
-
-        public NettyTcpServer build() {
-            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null");
-            Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024");
-            Preconditions.checkNotNull(this.nettyTcpSource, "nettyTcpSource must be set");
-            Preconditions.checkArgument(this.numberOfThreads > 0,
-                    "numberOfThreads must be set as positive");
-
-            return new NettyTcpServer(this);
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
index 58744ce794..22c4af4599 100644
--- a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
 # under the License.
 #
 
-name: tcp
-description: Netty Tcp Source Connector
-sourceClass: org.apache.pulsar.io.netty.NettyTcpSource
+name: netty
+description: Netty Tcp or Udp Source Connector
+sourceClass: org.apache.pulsar.io.netty.NettySource
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
similarity index 65%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
index efc763a907..034fa7979b 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
@@ -30,24 +30,27 @@
 import static org.junit.Assert.assertNotNull;
 
 /**
- * Tests for Netty Tcp Source Config
+ * Tests for Netty Tcp or Udp Source Config
  */
-public class NettyTcpSourceConfigTest {
+public class NettySourceConfigTest {
 
     private static final String LOCALHOST = "127.0.0.1";
+    private static final String TCP = "tcp";
 
     @Test
     public void testNettyTcpConfigLoadWithMap() throws IOException {
         Map<String, Object> map = new HashMap<>();
+        map.put("type", TCP);
         map.put("host", LOCALHOST);
         map.put("port", 10999);
         map.put("numberOfThreads", 1);
 
-        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(map);
-        assertNotNull(nettyTcpSourceConfig);
-        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
-        assertEquals(10999, nettyTcpSourceConfig.getPort());
-        assertEquals(1, nettyTcpSourceConfig.getNumberOfThreads());
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(map);
+        assertNotNull(nettySourceConfig);
+        assertEquals(TCP, nettySourceConfig.getType());
+        assertEquals(LOCALHOST, nettySourceConfig.getHost());
+        assertEquals(10999, nettySourceConfig.getPort());
+        assertEquals(1, nettySourceConfig.getNumberOfThreads());
     }
 
     @Test(expected = UnrecognizedPropertyException.class)
@@ -55,23 +58,24 @@ public void testNettyTcpConfigLoadWithMapWhenInvalidPropertyIsSet() throws IOExc
         Map<String, Object> map = new HashMap<>();
         map.put("invalidProperty", 1);
 
-        NettyTcpSourceConfig.load(map);
+        NettySourceConfig.load(map);
     }
 
     @Test
     public void testNettyTcpConfigLoadWithYamlFile() throws IOException {
-        File yamlFile = getFile("nettyTcpSourceConfig.yaml");
-        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
-        assertNotNull(nettyTcpSourceConfig);
-        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
-        assertEquals(10911, nettyTcpSourceConfig.getPort());
-        assertEquals(5, nettyTcpSourceConfig.getNumberOfThreads());
+        File yamlFile = getFile("nettySourceConfig.yaml");
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(nettySourceConfig);
+        assertEquals(TCP, nettySourceConfig.getType());
+        assertEquals(LOCALHOST, nettySourceConfig.getHost());
+        assertEquals(10911, nettySourceConfig.getPort());
+        assertEquals(5, nettySourceConfig.getNumberOfThreads());
     }
 
     @Test(expected = UnrecognizedPropertyException.class)
     public void testNettyTcpConfigLoadWithYamlFileWhenInvalidPropertyIsSet() throws IOException {
-        File yamlFile = getFile("nettyTcpSourceConfigWithInvalidProperty.yaml");
-        NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+        File yamlFile = getFile("nettySourceConfigWithInvalidProperty.yaml");
+        NettySourceConfig.load(yamlFile.getAbsolutePath());
     }
 
     private File getFile(String name) {
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
similarity index 89%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
index a5243e6640..b8d6dd45ea 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
 import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,7 +35,7 @@ public void testChannelInitializer() throws Exception {
         NioSocketChannel channel = new NioSocketChannel();
 
         NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
-                new NettyTcpServerHandler(new NettyTcpSource()));
+                new NettyServerHandler(new NettySource()));
         nettyChannelInitializer.initChannel(channel);
 
         assertNotNull(channel.pipeline().toMap());
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
similarity index 51%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
index 0c2f56b083..80415707bc 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
@@ -16,27 +16,71 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.NettySourceConfig;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**
- * Tests for Netty Tcp Server
+ * Tests for Netty Tcp or Udp Server
  */
-public class NettyTcpServerTest {
+public class NettyServerTest {
 
     private static final String LOCALHOST = "127.0.0.1";
+    private static final String TCP = "TCP";
+    private static final String UDP = "UDP";
 
     @Test
     public void testNettyTcpServerConstructor() {
-        NettyTcpServer nettyTcpServer = new NettyTcpServer.Builder()
+        NettyServer nettyTcpServer = new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
+                .build();
+
+        assertNotNull(nettyTcpServer);
+    }
+
+    @Test
+    public void testNettyUdpServerConstructor() {
+        NettyServer nettyUdpServer = new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(UDP))
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettySource(new NettySource())
+                .build();
+
+        assertNotNull(nettyUdpServer);
+    }
+
+    @Test
+    public void testNettyTcpServerByNettySourceConfig() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("type", "tcp");
+        map.put("host", LOCALHOST);
+        map.put("port", 10999);
+        map.put("numberOfThreads", 1);
+
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(map);
+
+        // test NettySource run function NettyServer Builder
+        NettyServer nettyTcpServer = new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase()))
+                .setHost(nettySourceConfig.getHost())
+                .setPort(nettySourceConfig.getPort())
+                .setNumberOfThreads(nettySourceConfig.getNumberOfThreads())
+                .setNettySource(new NettySource())
                 .build();
 
         assertNotNull(nettyTcpServer);
@@ -44,36 +88,38 @@ public void testNettyTcpServerConstructor() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerConstructorWhenHostIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setPort(10999)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerConstructorWhenPortIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
-
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerConstructorWhenNumberOfThreadsIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
-
     @Test(expected = NullPointerException.class)
     public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
                 .setNumberOfThreads(2)
@@ -82,41 +128,45 @@ public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() {
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenHostIsSetAsBlank() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(" ")
                 .setPort(10999)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenPortIsSetAsZero() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(0)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenPortIsSetLowerThan1024() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(1022)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenNumberOfThreadsIsSetAsZero() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
                 .setNumberOfThreads(0)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
similarity index 98%
rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
rename to pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
index ca748cc4ad..eff1a9ce1f 100644
--- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
+++ b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
@@ -18,7 +18,8 @@
 #
 
 {
+"type": "tcp",
 "host": "127.0.0.1",
 "port": "10911",
 "numberOfThreads": "5"
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
similarity index 98%
rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
rename to pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
index 8a2bb92d19..b1f90c2d1e 100644
--- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
+++ b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
@@ -18,7 +18,8 @@
 #
 
 {
+"type": "tcp",
 "host": "127.0.0.1",
 "port": "10911",
 "invalidProperty": "5"
-}
\ No newline at end of file
+}
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 0250ebc544..05a43dc40b 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,4 +17,4 @@ Pulsar Functions cluster.
 - [RabbitMQ Source Connector](io-rabbitmq.md#source)
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
-- [Netty Tcp Source Connector](io-tcp.md#source)
+- [Netty Tcp or Udp Source Connector](io-netty.md#source)
diff --git a/site2/docs/io-tcp.md b/site2/docs/io-netty.md
similarity index 69%
rename from site2/docs/io-tcp.md
rename to site2/docs/io-netty.md
index 8bf3a8959c..479752f9ad 100644
--- a/site2/docs/io-tcp.md
+++ b/site2/docs/io-netty.md
@@ -1,12 +1,12 @@
 ---
-id: io-tcp
-title: Netty Tcp Connector
-sidebar_label: Netty Tcp Connector
+id: io-netty
+title: Netty Tcp or Udp Connector
+sidebar_label: Netty Tcp or Udp Connector
 ---
 
 ## Source
 
-The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client and write them to user-defined Pulsar topic.
+The Netty Tcp or Udp Source connector is used to listen Tcp/Udp messages from Tcp/Udp Client and write them to user-defined Pulsar topic.
 Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment.
 Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports.
 
@@ -14,6 +14,7 @@ Otherwise, if the connector is running in process or thread mode, the instances
 
 | Name | Required | Default | Description |
 |------|----------|---------|-------------|
+| `type` | `false` | `tcp` | The tcp or udp network protocol required by netty. |
 | `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. |
 | `port` | `false` | `10999` | The port that the source instance to listen on. |
 | `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. |


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services