You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/05 09:32:19 UTC

[pulsar] branch master updated: issue #3276 Pulsar IO connector for udp source (#3295)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d22571   issue #3276 Pulsar IO connector for udp source (#3295)
1d22571 is described below

commit 1d2257110ed30012096ec5ef783805d0f1f09a78
Author: wpl <12...@qq.com>
AuthorDate: Sat Jan 5 17:32:14 2019 +0800

     issue #3276 Pulsar IO connector for udp source (#3295)
    
    ### Motivation
    
    Fixes #3276
    
    ### Result
    
    Pulsar IO connector for udp source
---
 .../org/apache/pulsar/io/netty/NettySource.java    |  83 ++++++++++
 ...TcpSourceConfig.java => NettySourceConfig.java} |  18 ++-
 .../org/apache/pulsar/io/netty/NettyTcpSource.java |  77 ----------
 .../{tcp => }/server/NettyChannelInitializer.java  |   2 +-
 .../apache/pulsar/io/netty/server/NettyServer.java | 169 +++++++++++++++++++++
 .../NettyServerHandler.java}                       |  18 +--
 .../pulsar/io/netty/tcp/server/NettyTcpServer.java | 123 ---------------
 .../resources/META-INF/services/pulsar-io.yaml     |   6 +-
 ...eConfigTest.java => NettySourceConfigTest.java} |  36 +++--
 .../server/NettyChannelInitializerTest.java        |   6 +-
 .../NettyServerTest.java}                          |  96 +++++++++---
 ...TcpSourceConfig.yaml => nettySourceConfig.yaml} |   3 +-
 ...l => nettySourceConfigWithInvalidProperty.yaml} |   3 +-
 site2/docs/io-connectors.md                        |   2 +-
 site2/docs/{io-tcp.md => io-netty.md}              |   9 +-
 15 files changed, 383 insertions(+), 268 deletions(-)

diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
new file mode 100644
index 0000000..215bd34
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySource.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.netty.server.NettyServer;
+import java.util.Map;
+
+/**
+ * A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic
+ */
+@Connector(
+    name = "netty",
+    type = IOType.SOURCE,
+    help = "A simple Netty Tcp or Udp Source connector to listen Tcp/Udp messages and write to user-defined Pulsar topic",
+    configClass = NettySourceConfig.class)
+public class NettySource extends PushSource<byte[]> {
+
+    private NettyServer nettyServer;
+    private Thread thread;
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(config);
+        if (nettySourceConfig.getType() == null
+                || nettySourceConfig.getHost() == null
+                || nettySourceConfig.getPort() <= 0) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+
+        thread = new Thread(new PulsarServerRunnable(nettySourceConfig, this));
+        thread.start();
+    }
+
+    @Override
+    public void close() throws Exception {
+        nettyServer.shutdownGracefully();
+    }
+
+    private class PulsarServerRunnable implements Runnable {
+
+        private NettySourceConfig nettySourceConfig;
+        private NettySource nettySource;
+
+        public PulsarServerRunnable(NettySourceConfig nettySourceConfig, NettySource nettySource) {
+            this.nettySourceConfig = nettySourceConfig;
+            this.nettySource = nettySource;
+        }
+
+        @Override
+        public void run() {
+            nettyServer = new NettyServer.Builder()
+                    .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase()))
+                    .setHost(nettySourceConfig.getHost())
+                    .setPort(nettySourceConfig.getPort())
+                    .setNumberOfThreads(nettySourceConfig.getNumberOfThreads())
+                    .setNettySource(nettySource)
+                    .build();
+
+            nettyServer.run();
+        }
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
similarity index 79%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
index 07b3cf8..f5d40e9 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettySourceConfig.java
@@ -30,7 +30,7 @@ import java.io.Serializable;
 import java.util.Map;
 
 /**
- * Netty Tcp Source Connector Config.
+ * Netty Tcp or Udp Source Connector Config.
  */
 @Data
 @Setter
@@ -38,12 +38,18 @@ import java.util.Map;
 @EqualsAndHashCode
 @ToString
 @Accessors(chain = true)
-public class NettyTcpSourceConfig implements Serializable {
+public class NettySourceConfig implements Serializable {
 
     private static final long serialVersionUID = -7116130435021510496L;
 
     @FieldDoc(
             required = true,
+            defaultValue = "tcp",
+            help = "The tcp or udp network protocols")
+    private String type = "tcp";
+
+    @FieldDoc(
+            required = true,
             defaultValue = "127.0.0.1",
             help = "The host name or address that the source instance to listen on")
     private String host = "127.0.0.1";
@@ -61,14 +67,14 @@ public class NettyTcpSourceConfig implements Serializable {
                     "handle the traffic of the accepted connections")
     private int numberOfThreads = 1;
 
-    public static NettyTcpSourceConfig load(Map<String, Object> map) throws IOException {
+    public static NettySourceConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettyTcpSourceConfig.class);
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettySourceConfig.class);
     }
 
-    public static NettyTcpSourceConfig load(String yamlFile) throws IOException {
+    public static NettySourceConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), NettyTcpSourceConfig.class);
+        return mapper.readValue(new File(yamlFile), NettySourceConfig.class);
     }
 
 }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
deleted file mode 100644
index 3cef730..0000000
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.netty;
-
-import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.apache.pulsar.io.netty.tcp.server.NettyTcpServer;
-import java.util.Map;
-
-/**
- * A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic
- */
-@Connector(
-        name = "tcp",
-        type = IOType.SOURCE,
-        help = "A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic",
-        configClass = NettyTcpSourceConfig.class)
-public class NettyTcpSource extends PushSource<byte[]> {
-
-    private NettyTcpServer nettyTcpServer;
-    private Thread thread;
-
-    @Override
-    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
-        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(config);
-
-        thread = new Thread(new PulsarTcpServerRunnable(nettyTcpSourceConfig, this));
-        thread.start();
-    }
-
-    @Override
-    public void close() throws Exception {
-        nettyTcpServer.shutdownGracefully();
-    }
-
-    private class PulsarTcpServerRunnable implements Runnable {
-
-        private NettyTcpSourceConfig nettyTcpSourceConfig;
-        private NettyTcpSource nettyTcpSource;
-
-        public PulsarTcpServerRunnable(NettyTcpSourceConfig nettyTcpSourceConfig, NettyTcpSource nettyTcpSource) {
-            this.nettyTcpSourceConfig = nettyTcpSourceConfig;
-            this.nettyTcpSource = nettyTcpSource;
-        }
-
-        @Override
-        public void run() {
-            nettyTcpServer = new NettyTcpServer.Builder()
-                .setHost(nettyTcpSourceConfig.getHost())
-                .setPort(nettyTcpSourceConfig.getPort())
-                .setNumberOfThreads(nettyTcpSourceConfig.getNumberOfThreads())
-                .setNettyTcpSource(nettyTcpSource)
-                .build();
-
-            nettyTcpServer.run();
-        }
-    }
-
-}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
similarity index 97%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
index f88f514..b5fa820 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
new file mode 100644
index 0000000..2cb8031
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettySource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty Tcp or Udp Server to accept any incoming data through Tcp.
+ */
+public class NettyServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
+
+    private Type type;
+    private String host;
+    private int port;
+    private NettySource nettySource;
+    private int numberOfThreads;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    private NettyServer(Builder builder) {
+        this.type = builder.type;
+        this.host = builder.host;
+        this.port = builder.port;
+        this.nettySource = builder.nettySource;
+        this.numberOfThreads = builder.numberOfThreads;
+    }
+
+    public void run() {
+        try {
+            switch (type) {
+                case TCP:
+                    runTcp();
+                    break;
+                case UDP:
+                    runUdp();
+                    break;
+                default:
+                    runTcp();
+                    break;
+            }
+        } catch(Exception ex) {
+            logger.error("Error occurred when Netty Tcp or Udp Server is running", ex);
+        } finally {
+            shutdownGracefully();
+        }
+    }
+
+    public void shutdownGracefully() {
+        if (workerGroup != null)
+            workerGroup.shutdownGracefully();
+        if (bossGroup != null)
+            bossGroup.shutdownGracefully();
+    }
+
+    private void runUdp() throws InterruptedException {
+        workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(workerGroup);
+        bootstrap.channel(NioDatagramChannel.class);
+        bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+                .option(ChannelOption.SO_BACKLOG, 1024);
+
+        ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    private void runTcp() throws InterruptedException {
+        bossGroup = new NioEventLoopGroup(this.numberOfThreads);
+        workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+        ServerBootstrap serverBootstrap = new ServerBootstrap();
+        serverBootstrap.group(bossGroup, workerGroup);
+        serverBootstrap.channel(NioServerSocketChannel.class);
+        serverBootstrap.childHandler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+                .option(ChannelOption.SO_BACKLOG, 1024)
+                .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    /**
+     * Pulsar Netty Server Builder.
+     */
+    public static class Builder {
+
+        private Type type;
+        private String host;
+        private int port;
+        private NettySource nettySource;
+        private int numberOfThreads;
+
+        public Builder setType(Type type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder setNettySource(NettySource nettySource) {
+            this.nettySource = nettySource;
+            return this;
+        }
+
+        public Builder setNumberOfThreads(int numberOfThreads) {
+            this.numberOfThreads = numberOfThreads;
+            return this;
+        }
+
+        public NettyServer build() {
+            Preconditions.checkNotNull(this.type, "type cannot be blank/null");
+            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null");
+            Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024");
+            Preconditions.checkNotNull(this.nettySource, "nettySource must be set");
+            Preconditions.checkArgument(this.numberOfThreads > 0,
+                    "numberOfThreads must be set as positive");
+
+            return new NettyServer(this);
+        }
+    }
+
+    /**
+     * tcp or udp network protocol
+     */
+    public enum Type {
+
+        TCP,
+
+        UDP
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
similarity index 74%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
index 2fd2cf3..81fd203 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
 import io.netty.channel.*;
 import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,19 +32,19 @@ import java.util.Optional;
  * Handles a server-side channel
  */
 @ChannelHandler.Sharable
-public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
 
-    private static final Logger logger = LoggerFactory.getLogger(NettyTcpServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
 
-    private NettyTcpSource nettyTcpSource;
+    private NettySource nettySource;
 
-    public NettyTcpServerHandler(NettyTcpSource nettyTcpSource) {
-        this.nettyTcpSource = nettyTcpSource;
+    public NettyServerHandler(NettySource nettySource) {
+        this.nettySource = nettySource;
     }
 
     @Override
     protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
-        nettyTcpSource.consume(new NettyTcpRecord(Optional.ofNullable(""), bytes));
+        nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
     }
 
     @Override
@@ -54,7 +54,7 @@ public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
     }
 
     @Data
-    static private class NettyTcpRecord implements Record<byte[]>, Serializable {
+    static private class NettyRecord implements Record<byte[]>, Serializable {
         private final Optional<String> key;
         private final byte[] value;
     }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
deleted file mode 100644
index b471b03..0000000
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.netty.tcp.server;
-
-import com.google.common.base.Preconditions;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.netty.NettyTcpSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Netty Tcp Server to accept any incoming data through Tcp.
- */
-public class NettyTcpServer {
-
-    private static final Logger logger = LoggerFactory.getLogger(NettyTcpServer.class);
-
-    private String host;
-    private int port;
-    private NettyTcpSource nettyTcpSource;
-    private int numberOfThreads;
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-
-    private NettyTcpServer(Builder builder) {
-        this.host = builder.host;
-        this.port = builder.port;
-        this.nettyTcpSource = builder.nettyTcpSource;
-        this.numberOfThreads = builder.numberOfThreads;
-    }
-
-    public void run() {
-        try {
-            bossGroup = new NioEventLoopGroup(this.numberOfThreads);
-            workerGroup = new NioEventLoopGroup(this.numberOfThreads);
-
-            ServerBootstrap serverBootstrap = new ServerBootstrap();
-            serverBootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new NettyChannelInitializer(new NettyTcpServerHandler(this.nettyTcpSource)))
-                    .option(ChannelOption.SO_BACKLOG, 1024)
-                    .childOption(ChannelOption.SO_KEEPALIVE, true);
-
-            ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
-            channelFuture.channel().closeFuture().sync();
-        } catch(Exception ex) {
-            logger.error("Error occurred when Netty Tcp Server is running", ex);
-        } finally {
-            shutdownGracefully();
-        }
-    }
-
-    public void shutdownGracefully() {
-        if (workerGroup != null)
-            workerGroup.shutdownGracefully();
-        if (bossGroup != null)
-            bossGroup.shutdownGracefully();
-    }
-
-    /**
-     * Pulsar Tcp Server Builder.
-     */
-    public static class Builder {
-
-        private String host;
-        private int port;
-        private NettyTcpSource nettyTcpSource;
-        private int numberOfThreads;
-
-        public Builder setHost(String host) {
-            this.host = host;
-            return this;
-        }
-
-        public Builder setPort(int port) {
-            this.port = port;
-            return this;
-        }
-
-        public Builder setNettyTcpSource(NettyTcpSource nettyTcpSource) {
-            this.nettyTcpSource = nettyTcpSource;
-            return this;
-        }
-
-        public Builder setNumberOfThreads(int numberOfThreads) {
-            this.numberOfThreads = numberOfThreads;
-            return this;
-        }
-
-        public NettyTcpServer build() {
-            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null");
-            Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024");
-            Preconditions.checkNotNull(this.nettyTcpSource, "nettyTcpSource must be set");
-            Preconditions.checkArgument(this.numberOfThreads > 0,
-                    "numberOfThreads must be set as positive");
-
-            return new NettyTcpServer(this);
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
index 58744ce..22c4af4 100644
--- a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
 # under the License.
 #
 
-name: tcp
-description: Netty Tcp Source Connector
-sourceClass: org.apache.pulsar.io.netty.NettyTcpSource
+name: netty
+description: Netty Tcp or Udp Source Connector
+sourceClass: org.apache.pulsar.io.netty.NettySource
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
similarity index 65%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
index efc763a..034fa79 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettySourceConfigTest.java
@@ -30,24 +30,27 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**
- * Tests for Netty Tcp Source Config
+ * Tests for Netty Tcp or Udp Source Config
  */
-public class NettyTcpSourceConfigTest {
+public class NettySourceConfigTest {
 
     private static final String LOCALHOST = "127.0.0.1";
+    private static final String TCP = "tcp";
 
     @Test
     public void testNettyTcpConfigLoadWithMap() throws IOException {
         Map<String, Object> map = new HashMap<>();
+        map.put("type", TCP);
         map.put("host", LOCALHOST);
         map.put("port", 10999);
         map.put("numberOfThreads", 1);
 
-        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(map);
-        assertNotNull(nettyTcpSourceConfig);
-        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
-        assertEquals(10999, nettyTcpSourceConfig.getPort());
-        assertEquals(1, nettyTcpSourceConfig.getNumberOfThreads());
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(map);
+        assertNotNull(nettySourceConfig);
+        assertEquals(TCP, nettySourceConfig.getType());
+        assertEquals(LOCALHOST, nettySourceConfig.getHost());
+        assertEquals(10999, nettySourceConfig.getPort());
+        assertEquals(1, nettySourceConfig.getNumberOfThreads());
     }
 
     @Test(expected = UnrecognizedPropertyException.class)
@@ -55,23 +58,24 @@ public class NettyTcpSourceConfigTest {
         Map<String, Object> map = new HashMap<>();
         map.put("invalidProperty", 1);
 
-        NettyTcpSourceConfig.load(map);
+        NettySourceConfig.load(map);
     }
 
     @Test
     public void testNettyTcpConfigLoadWithYamlFile() throws IOException {
-        File yamlFile = getFile("nettyTcpSourceConfig.yaml");
-        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
-        assertNotNull(nettyTcpSourceConfig);
-        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
-        assertEquals(10911, nettyTcpSourceConfig.getPort());
-        assertEquals(5, nettyTcpSourceConfig.getNumberOfThreads());
+        File yamlFile = getFile("nettySourceConfig.yaml");
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(nettySourceConfig);
+        assertEquals(TCP, nettySourceConfig.getType());
+        assertEquals(LOCALHOST, nettySourceConfig.getHost());
+        assertEquals(10911, nettySourceConfig.getPort());
+        assertEquals(5, nettySourceConfig.getNumberOfThreads());
     }
 
     @Test(expected = UnrecognizedPropertyException.class)
     public void testNettyTcpConfigLoadWithYamlFileWhenInvalidPropertyIsSet() throws IOException {
-        File yamlFile = getFile("nettyTcpSourceConfigWithInvalidProperty.yaml");
-        NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+        File yamlFile = getFile("nettySourceConfigWithInvalidProperty.yaml");
+        NettySourceConfig.load(yamlFile.getAbsolutePath());
     }
 
     private File getFile(String name) {
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
similarity index 89%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
index a5243e6..b8d6dd4 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
 import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -35,7 +35,7 @@ public class NettyChannelInitializerTest {
         NioSocketChannel channel = new NioSocketChannel();
 
         NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
-                new NettyTcpServerHandler(new NettyTcpSource()));
+                new NettyServerHandler(new NettySource()));
         nettyChannelInitializer.initChannel(channel);
 
         assertNotNull(channel.pipeline().toMap());
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
similarity index 51%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
index 0c2f56b..8041570 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyServerTest.java
@@ -16,27 +16,71 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.tcp.server;
+package org.apache.pulsar.io.netty.server;
 
-import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.NettySourceConfig;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**
- * Tests for Netty Tcp Server
+ * Tests for Netty Tcp or Udp Server
  */
-public class NettyTcpServerTest {
+public class NettyServerTest {
 
     private static final String LOCALHOST = "127.0.0.1";
+    private static final String TCP = "TCP";
+    private static final String UDP = "UDP";
 
     @Test
     public void testNettyTcpServerConstructor() {
-        NettyTcpServer nettyTcpServer = new NettyTcpServer.Builder()
+        NettyServer nettyTcpServer = new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
+                .build();
+
+        assertNotNull(nettyTcpServer);
+    }
+
+    @Test
+    public void testNettyUdpServerConstructor() {
+        NettyServer nettyUdpServer = new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(UDP))
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettySource(new NettySource())
+                .build();
+
+        assertNotNull(nettyUdpServer);
+    }
+
+    @Test
+    public void testNettyTcpServerByNettySourceConfig() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("type", "tcp");
+        map.put("host", LOCALHOST);
+        map.put("port", 10999);
+        map.put("numberOfThreads", 1);
+
+        NettySourceConfig nettySourceConfig = NettySourceConfig.load(map);
+
+        // test NettySource run function NettyServer Builder
+        NettyServer nettyTcpServer = new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(nettySourceConfig.getType().toUpperCase()))
+                .setHost(nettySourceConfig.getHost())
+                .setPort(nettySourceConfig.getPort())
+                .setNumberOfThreads(nettySourceConfig.getNumberOfThreads())
+                .setNettySource(new NettySource())
                 .build();
 
         assertNotNull(nettyTcpServer);
@@ -44,36 +88,38 @@ public class NettyTcpServerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerConstructorWhenHostIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setPort(10999)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerConstructorWhenPortIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
-
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerConstructorWhenNumberOfThreadsIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
-
     @Test(expected = NullPointerException.class)
     public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
                 .setNumberOfThreads(2)
@@ -82,41 +128,45 @@ public class NettyTcpServerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenHostIsSetAsBlank() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(" ")
                 .setPort(10999)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenPortIsSetAsZero() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(0)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenPortIsSetLowerThan1024() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(1022)
                 .setNumberOfThreads(2)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNettyTcpServerWhenNumberOfThreadsIsSetAsZero() {
-        new NettyTcpServer.Builder()
+        new NettyServer.Builder()
+                .setType(NettyServer.Type.valueOf(TCP))
                 .setHost(LOCALHOST)
                 .setPort(10999)
                 .setNumberOfThreads(0)
-                .setNettyTcpSource(new NettyTcpSource())
+                .setNettySource(new NettySource())
                 .build();
     }
 
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
similarity index 98%
rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
rename to pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
index ca748cc..eff1a9c 100644
--- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
+++ b/pulsar-io/netty/src/test/resources/nettySourceConfig.yaml
@@ -18,7 +18,8 @@
 #
 
 {
+"type": "tcp",
 "host": "127.0.0.1",
 "port": "10911",
 "numberOfThreads": "5"
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
similarity index 98%
rename from pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
rename to pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
index 8a2bb92..b1f90c2 100644
--- a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
+++ b/pulsar-io/netty/src/test/resources/nettySourceConfigWithInvalidProperty.yaml
@@ -18,7 +18,8 @@
 #
 
 {
+"type": "tcp",
 "host": "127.0.0.1",
 "port": "10911",
 "invalidProperty": "5"
-}
\ No newline at end of file
+}
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 0250ebc..05a43dc 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,4 +17,4 @@ Pulsar Functions cluster.
 - [RabbitMQ Source Connector](io-rabbitmq.md#source)
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
-- [Netty Tcp Source Connector](io-tcp.md#source)
+- [Netty Tcp or Udp Source Connector](io-netty.md#source)
diff --git a/site2/docs/io-tcp.md b/site2/docs/io-netty.md
similarity index 69%
rename from site2/docs/io-tcp.md
rename to site2/docs/io-netty.md
index 8bf3a89..479752f 100644
--- a/site2/docs/io-tcp.md
+++ b/site2/docs/io-netty.md
@@ -1,12 +1,12 @@
 ---
-id: io-tcp
-title: Netty Tcp Connector
-sidebar_label: Netty Tcp Connector
+id: io-netty
+title: Netty Tcp or Udp Connector
+sidebar_label: Netty Tcp or Udp Connector
 ---
 
 ## Source
 
-The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client and write them to user-defined Pulsar topic.
+The Netty Tcp or Udp Source connector is used to listen Tcp/Udp messages from Tcp/Udp Client and write them to user-defined Pulsar topic.
 Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment.
 Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports.
 
@@ -14,6 +14,7 @@ Otherwise, if the connector is running in process or thread mode, the instances
 
 | Name | Required | Default | Description |
 |------|----------|---------|-------------|
+| `type` | `false` | `tcp` | The tcp or udp network protocol required by netty. |
 | `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. |
 | `port` | `false` | `10999` | The port that the source instance to listen on. |
 | `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. |