You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/16 01:35:45 UTC

[GitHub] sijie closed pull request #3179: [Pulsar-IO] Add Netty Tcp Source Support

sijie closed pull request #3179: [Pulsar-IO] Add Netty Tcp Source Support
URL: https://github.com/apache/pulsar/pull/3179
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/netty/pom.xml b/pulsar-io/netty/pom.xml
new file mode 100644
index 0000000000..40dabc9445
--- /dev/null
+++ b/pulsar-io/netty/pom.xml
@@ -0,0 +1,86 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-io</artifactId>
+        <version>2.3.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-io-netty</artifactId>
+    <name>Pulsar IO :: Netty</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
new file mode 100644
index 0000000000..3cef730dc3
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSource.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.netty.tcp.server.NettyTcpServer;
+import java.util.Map;
+
+/**
+ * A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic
+ */
+@Connector(
+        name = "tcp",
+        type = IOType.SOURCE,
+        help = "A simple Netty Tcp Source connector to listen Tcp messages and write to user-defined Pulsar topic",
+        configClass = NettyTcpSourceConfig.class)
+public class NettyTcpSource extends PushSource<byte[]> {
+
+    private NettyTcpServer nettyTcpServer;
+    private Thread thread;
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(config);
+
+        thread = new Thread(new PulsarTcpServerRunnable(nettyTcpSourceConfig, this));
+        thread.start();
+    }
+
+    @Override
+    public void close() throws Exception {
+        nettyTcpServer.shutdownGracefully();
+    }
+
+    private class PulsarTcpServerRunnable implements Runnable {
+
+        private NettyTcpSourceConfig nettyTcpSourceConfig;
+        private NettyTcpSource nettyTcpSource;
+
+        public PulsarTcpServerRunnable(NettyTcpSourceConfig nettyTcpSourceConfig, NettyTcpSource nettyTcpSource) {
+            this.nettyTcpSourceConfig = nettyTcpSourceConfig;
+            this.nettyTcpSource = nettyTcpSource;
+        }
+
+        @Override
+        public void run() {
+            nettyTcpServer = new NettyTcpServer.Builder()
+                .setHost(nettyTcpSourceConfig.getHost())
+                .setPort(nettyTcpSourceConfig.getPort())
+                .setNumberOfThreads(nettyTcpSourceConfig.getNumberOfThreads())
+                .setNettyTcpSource(nettyTcpSource)
+                .build();
+
+            nettyTcpServer.run();
+        }
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
new file mode 100644
index 0000000000..07b3cf8b8f
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/NettyTcpSourceConfig.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.*;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Netty Tcp Source Connector Config.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class NettyTcpSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = -7116130435021510496L;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "127.0.0.1",
+            help = "The host name or address that the source instance to listen on")
+    private String host = "127.0.0.1";
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "10999",
+            help = "The port that the source instance to listen on")
+    private int port = 10999;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "1",
+            help = "The number of threads of Netty Tcp Server to accept incoming connections and " +
+                    "handle the traffic of the accepted connections")
+    private int numberOfThreads = 1;
+
+    public static NettyTcpSourceConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), NettyTcpSourceConfig.class);
+    }
+
+    public static NettyTcpSourceConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), NettyTcpSourceConfig.class);
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
new file mode 100644
index 0000000000..f88f514a4a
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.bytes.ByteArrayDecoder;
+
+/**
+ * Netty Channel Initializer to register decoder and handler
+ */
+public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+    private ChannelInboundHandlerAdapter handler;
+
+    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+        this.handler = handler;
+    }
+
+    @Override
+    protected void initChannel(SocketChannel socketChannel) throws Exception {
+        socketChannel.pipeline().addLast(new ByteArrayDecoder());
+        socketChannel.pipeline().addLast(this.handler);
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
new file mode 100644
index 0000000000..b471b03697
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServer.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty Tcp Server to accept any incoming data through Tcp.
+ */
+public class NettyTcpServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(NettyTcpServer.class);
+
+    private String host;
+    private int port;
+    private NettyTcpSource nettyTcpSource;
+    private int numberOfThreads;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    private NettyTcpServer(Builder builder) {
+        this.host = builder.host;
+        this.port = builder.port;
+        this.nettyTcpSource = builder.nettyTcpSource;
+        this.numberOfThreads = builder.numberOfThreads;
+    }
+
+    public void run() {
+        try {
+            bossGroup = new NioEventLoopGroup(this.numberOfThreads);
+            workerGroup = new NioEventLoopGroup(this.numberOfThreads);
+
+            ServerBootstrap serverBootstrap = new ServerBootstrap();
+            serverBootstrap.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new NettyChannelInitializer(new NettyTcpServerHandler(this.nettyTcpSource)))
+                    .option(ChannelOption.SO_BACKLOG, 1024)
+                    .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+            ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
+            channelFuture.channel().closeFuture().sync();
+        } catch(Exception ex) {
+            logger.error("Error occurred when Netty Tcp Server is running", ex);
+        } finally {
+            shutdownGracefully();
+        }
+    }
+
+    public void shutdownGracefully() {
+        if (workerGroup != null)
+            workerGroup.shutdownGracefully();
+        if (bossGroup != null)
+            bossGroup.shutdownGracefully();
+    }
+
+    /**
+     * Pulsar Tcp Server Builder.
+     */
+    public static class Builder {
+
+        private String host;
+        private int port;
+        private NettyTcpSource nettyTcpSource;
+        private int numberOfThreads;
+
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder setNettyTcpSource(NettyTcpSource nettyTcpSource) {
+            this.nettyTcpSource = nettyTcpSource;
+            return this;
+        }
+
+        public Builder setNumberOfThreads(int numberOfThreads) {
+            this.numberOfThreads = numberOfThreads;
+            return this;
+        }
+
+        public NettyTcpServer build() {
+            Preconditions.checkArgument(StringUtils.isNotBlank(host), "host cannot be blank/null");
+            Preconditions.checkArgument(this.port >= 1024, "port must be set equal or bigger than 1024");
+            Preconditions.checkNotNull(this.nettyTcpSource, "nettyTcpSource must be set");
+            Preconditions.checkArgument(this.numberOfThreads > 0,
+                    "numberOfThreads must be set as positive");
+
+            return new NettyTcpServer(this);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
new file mode 100644
index 0000000000..2fd2cf3c69
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerHandler.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.*;
+import lombok.Data;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Handles a server-side channel
+ */
+@ChannelHandler.Sharable
+public class NettyTcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
+
+    private static final Logger logger = LoggerFactory.getLogger(NettyTcpServerHandler.class);
+
+    private NettyTcpSource nettyTcpSource;
+
+    public NettyTcpServerHandler(NettyTcpSource nettyTcpSource) {
+        this.nettyTcpSource = nettyTcpSource;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
+        nettyTcpSource.consume(new NettyTcpRecord(Optional.ofNullable(""), bytes));
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        logger.error("Error when processing incoming data", cause);
+        ctx.close();
+    }
+
+    @Data
+    static private class NettyTcpRecord implements Record<byte[]>, Serializable {
+        private final Optional<String> key;
+        private final byte[] value;
+    }
+
+}
diff --git a/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000000..58744ce794
--- /dev/null
+++ b/pulsar-io/netty/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: tcp
+description: Netty Tcp Source Connector
+sourceClass: org.apache.pulsar.io.netty.NettyTcpSource
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
new file mode 100644
index 0000000000..efc763a907
--- /dev/null
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/NettyTcpSourceConfigTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty;
+
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Tcp Source Config
+ */
+public class NettyTcpSourceConfigTest {
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    @Test
+    public void testNettyTcpConfigLoadWithMap() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", LOCALHOST);
+        map.put("port", 10999);
+        map.put("numberOfThreads", 1);
+
+        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(map);
+        assertNotNull(nettyTcpSourceConfig);
+        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
+        assertEquals(10999, nettyTcpSourceConfig.getPort());
+        assertEquals(1, nettyTcpSourceConfig.getNumberOfThreads());
+    }
+
+    @Test(expected = UnrecognizedPropertyException.class)
+    public void testNettyTcpConfigLoadWithMapWhenInvalidPropertyIsSet() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("invalidProperty", 1);
+
+        NettyTcpSourceConfig.load(map);
+    }
+
+    @Test
+    public void testNettyTcpConfigLoadWithYamlFile() throws IOException {
+        File yamlFile = getFile("nettyTcpSourceConfig.yaml");
+        NettyTcpSourceConfig nettyTcpSourceConfig = NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(nettyTcpSourceConfig);
+        assertEquals(LOCALHOST, nettyTcpSourceConfig.getHost());
+        assertEquals(10911, nettyTcpSourceConfig.getPort());
+        assertEquals(5, nettyTcpSourceConfig.getNumberOfThreads());
+    }
+
+    @Test(expected = UnrecognizedPropertyException.class)
+    public void testNettyTcpConfigLoadWithYamlFileWhenInvalidPropertyIsSet() throws IOException {
+        File yamlFile = getFile("nettyTcpSourceConfigWithInvalidProperty.yaml");
+        NettyTcpSourceConfig.load(yamlFile.getAbsolutePath());
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+
+}
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
new file mode 100644
index 0000000000..a5243e6640
--- /dev/null
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyChannelInitializerTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Channel Initializer
+ */
+public class NettyChannelInitializerTest {
+
+    @Test
+    public void testChannelInitializer() throws Exception {
+        NioSocketChannel channel = new NioSocketChannel();
+
+        NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
+                new NettyTcpServerHandler(new NettyTcpSource()));
+        nettyChannelInitializer.initChannel(channel);
+
+        assertNotNull(channel.pipeline().toMap());
+        assertEquals(2, channel.pipeline().toMap().size());
+    }
+
+}
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
new file mode 100644
index 0000000000..0c2f56b083
--- /dev/null
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/server/NettyTcpServerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp.server;
+
+import org.apache.pulsar.io.netty.NettyTcpSource;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for Netty Tcp Server
+ */
+public class NettyTcpServerTest {
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    @Test
+    public void testNettyTcpServerConstructor() {
+        NettyTcpServer nettyTcpServer = new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+
+        assertNotNull(nettyTcpServer);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerConstructorWhenHostIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerConstructorWhenPortIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerConstructorWhenNumberOfThreadsIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+
+    @Test(expected = NullPointerException.class)
+    public void testNettyTcpServerConstructorWhenNettyTcpSourceIsNotSet() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenHostIsSetAsBlank() {
+        new NettyTcpServer.Builder()
+                .setHost(" ")
+                .setPort(10999)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenPortIsSetAsZero() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(0)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenPortIsSetLowerThan1024() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(1022)
+                .setNumberOfThreads(2)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNettyTcpServerWhenNumberOfThreadsIsSetAsZero() {
+        new NettyTcpServer.Builder()
+                .setHost(LOCALHOST)
+                .setPort(10999)
+                .setNumberOfThreads(0)
+                .setNettyTcpSource(new NettyTcpSource())
+                .build();
+    }
+
+}
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
new file mode 100644
index 0000000000..ca748cc4ad
--- /dev/null
+++ b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfig.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "127.0.0.1",
+"port": "10911",
+"numberOfThreads": "5"
+}
\ No newline at end of file
diff --git a/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
new file mode 100644
index 0000000000..8a2bb92d19
--- /dev/null
+++ b/pulsar-io/netty/src/test/resources/nettyTcpSourceConfigWithInvalidProperty.yaml
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "127.0.0.1",
+"port": "10911",
+"invalidProperty": "5"
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 28a7f12a09..a9ff107214 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -47,6 +47,7 @@
     <module>kafka-connect-adaptor</module>
     <module>debezium</module>
     <module>canal</module>
+    <module>netty</module>
   </modules>
 
 </project>
diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md
index 92a19dd70e..0250ebc544 100644
--- a/site2/docs/io-connectors.md
+++ b/site2/docs/io-connectors.md
@@ -17,3 +17,4 @@ Pulsar Functions cluster.
 - [RabbitMQ Source Connector](io-rabbitmq.md#source)
 - [Twitter Firehose Source Connector](io-twitter.md)
 - [CDC Source Connector based on Debezium](io-cdc.md)
+- [Netty Tcp Source Connector](io-tcp.md#source)
diff --git a/site2/docs/io-tcp.md b/site2/docs/io-tcp.md
new file mode 100644
index 0000000000..8bf3a8959c
--- /dev/null
+++ b/site2/docs/io-tcp.md
@@ -0,0 +1,19 @@
+---
+id: io-tcp
+title: Netty Tcp Connector
+sidebar_label: Netty Tcp Connector
+---
+
+## Source
+
+The Netty Tcp Source connector is used to listen Tcp messages from Tcp Client and write them to user-defined Pulsar topic.
+Also, this connector is suggested to be used in a containerized (e.g. k8s) deployment.
+Otherwise, if the connector is running in process or thread mode, the instances may be conflicting on listening to ports.
+
+### Source Configuration Options
+
+| Name | Required | Default | Description |
+|------|----------|---------|-------------|
+| `host` | `false` | `127.0.0.1` | The host name or address that the source instance to listen on. |
+| `port` | `false` | `10999` | The port that the source instance to listen on. |
+| `numberOfThreads` | `false` | `1` | The number of threads of Netty Tcp Server to accept incoming connections and handle the traffic of the accepted connections. |


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services