You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/12/14 02:40:48 UTC

[pulsar] branch master updated: [Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b52449a  [Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829)
b52449a is described below

commit b52449a6604cbdc4fe70bdfd8ecc9456d6b42ece
Author: gbensa <58...@users.noreply.github.com>
AuthorDate: Sat Dec 14 03:40:39 2019 +0100

    [Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829)
    
    
    ### Motivation
    UDP protocol is not working for netty connector
    
    ### Modifications
    Added a specific handler for UDP and use Channel instead of SocketChannel in NettyChannelInitializer
    
    Successfully tested
    
    * Fixes for UDP protocol support in netty connector
    
    * Added specific handlers for UDP and TCP netty connector
---
 .../apache/pulsar/io/netty/server/NettyServer.java | 21 +++++++++------
 .../NettyTCPChannelInitializer.java}               | 14 +++++-----
 .../NettyTCPServerHandler.java}                    | 22 ++++++++--------
 .../package-info.java}                             | 26 +------------------
 .../NettyUDPChannelInitializer.java}               | 14 +++++-----
 .../NettyUDPServerHandler.java}                    | 30 ++++++++++++----------
 .../package-info.java}                             | 26 +------------------
 ...tyChannelInitializer.java => package-info.java} | 25 ------------------
 .../NettyTCPChannelInitializerTest.java}           | 21 +++++++++------
 .../NettyUDPChannelInitializerTest.java}           | 21 +++++++--------
 10 files changed, 79 insertions(+), 141 deletions(-)

diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
index 775b6f4..60c9c2d 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -21,6 +21,17 @@ package org.apache.pulsar.io.netty.server;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
+import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPChannelInitializer;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
@@ -30,12 +41,6 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.netty.NettySource;
-import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
-import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Netty Server to accept incoming data via the configured type.
@@ -96,7 +101,7 @@ public class NettyServer {
         Bootstrap bootstrap = new Bootstrap();
         bootstrap.group(workerGroup);
         bootstrap.channel(NioDatagramChannel.class);
-        bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+        bootstrap.handler(new NettyUDPChannelInitializer(new NettyUDPServerHandler(this.nettySource)))
                 .option(ChannelOption.SO_BACKLOG, 1024);
 
         ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync();
@@ -105,7 +110,7 @@ public class NettyServer {
 
     private void runTcp() throws InterruptedException {
         ServerBootstrap serverBootstrap = getServerBootstrap(
-                new NettyChannelInitializer(new NettyServerHandler(this.nettySource)));
+                new NettyTCPChannelInitializer(new NettyTCPServerHandler(this.nettySource)));
 
         ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
         channelFuture.channel().closeFuture().sync();
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
similarity index 72%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
index b9a7b4c..1f4972b 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
@@ -16,28 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.tcp;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.Channel;
 import io.netty.handler.codec.bytes.ByteArrayDecoder;
 
 /**
  * Netty Channel Initializer to register decoder and handler.
  */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyTCPChannelInitializer extends ChannelInitializer<Channel> {
 
     private ChannelInboundHandlerAdapter handler;
 
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+    public NettyTCPChannelInitializer(ChannelInboundHandlerAdapter handler) {
         this.handler = handler;
     }
 
     @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
+    protected void initChannel(Channel channel) throws Exception {
+        channel.pipeline().addLast(new ByteArrayDecoder());
+        channel.pipeline().addLast(this.handler);
     }
 
 }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
similarity index 82%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
index 42f497e..a619bc3 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
@@ -16,38 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.tcp;
 
 import java.io.Serializable;
 import java.util.Optional;
 
-import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.Data;
+
 /**
  * Handles a server-side channel.
  */
 @ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyTCPServerHandler extends SimpleChannelInboundHandler<byte[]> {
 
-    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(NettyTCPServerHandler.class);
 
     private NettySource nettySource;
 
-    public NettyServerHandler(NettySource nettySource) {
+    public NettyTCPServerHandler(NettySource nettySource) {
         this.nettySource = nettySource;
     }
 
     @Override
     protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
-        nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+        nettySource.consume(new NettyTCPRecord(Optional.ofNullable(""), bytes));
     }
 
     @Override
@@ -57,7 +57,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
     }
 
     @Data
-    static private class NettyRecord implements Record<byte[]>, Serializable {
+    static private class NettyTCPRecord implements Record<byte[]>, Serializable {
         private final Optional<String> key;
         private final byte[] value;
     }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
similarity index 50%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
index b9a7b4c..b59d614 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
@@ -16,28 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler.
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-    private ChannelInboundHandlerAdapter handler;
-
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
-    }
-
-}
+package org.apache.pulsar.io.netty.tcp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
similarity index 68%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
index b9a7b4c..bc86559 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
@@ -16,28 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.udp;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
 
 /**
  * Netty Channel Initializer to register decoder and handler.
  */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyUDPChannelInitializer extends ChannelInitializer<Channel> {
 
     private ChannelInboundHandlerAdapter handler;
 
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+    public NettyUDPChannelInitializer(ChannelInboundHandlerAdapter handler) {
         this.handler = handler;
     }
 
     @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
+    protected void initChannel(Channel channel) throws Exception {
+        channel.pipeline().addLast(this.handler);
     }
 
 }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
similarity index 75%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
index 42f497e..8341628 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
@@ -16,38 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.udp;
 
 import java.io.Serializable;
 import java.util.Optional;
 
-import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.DatagramPacket;
+import lombok.Data;
+
 /**
  * Handles a server-side channel.
  */
 @ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
-
-    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+public class NettyUDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
 
+    private static final Logger logger = LoggerFactory.getLogger(NettyUDPServerHandler.class);
     private NettySource nettySource;
 
-    public NettyServerHandler(NettySource nettySource) {
+    public NettyUDPServerHandler(NettySource nettySource) {
         this.nettySource = nettySource;
     }
-
+    
     @Override
-    protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
-        nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
+        byte[] bytes = ByteBufUtil.getBytes(packet.content());
+        nettySource.consume(new NettyUDPRecord(Optional.ofNullable(""), bytes));
     }
 
     @Override
@@ -57,7 +59,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
     }
 
     @Data
-    static private class NettyRecord implements Record<byte[]>, Serializable {
+    static private class NettyUDPRecord implements Record<byte[]>, Serializable {
         private final Optional<String> key;
         private final byte[] value;
     }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
similarity index 50%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
index b9a7b4c..d936c6a 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
@@ -16,28 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler.
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-    private ChannelInboundHandlerAdapter handler;
-
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
-    }
-
-}
+package org.apache.pulsar.io.netty.udp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/package-info.java
similarity index 50%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
rename to pulsar-io/netty/src/main/java/package-info.java
index b9a7b4c..51da6c0 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/package-info.java
@@ -16,28 +16,3 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler.
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
-
-    private ChannelInboundHandlerAdapter handler;
-
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
-    }
-
-}
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
similarity index 73%
copy from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
copy to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
index 7fb3b45..366c7b1 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
@@ -16,30 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.tcp;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
-import io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 /**
  * Tests for Netty Channel Initializer
  */
-public class NettyChannelInitializerTest {
+public class NettyTCPChannelInitializerTest {
 
     @Test
     public void testChannelInitializer() throws Exception {
         NioSocketChannel channel = new NioSocketChannel();
 
-        NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
-                new NettyServerHandler(new NettySource()));
+        NettyTCPChannelInitializer nettyChannelInitializer = new NettyTCPChannelInitializer(
+                new NettyTCPServerHandler(new NettySource()));
         nettyChannelInitializer.initChannel(channel);
 
         assertNotNull(channel.pipeline().toMap());
         assertEquals(2, channel.pipeline().toMap().size());
     }
-
+    
 }
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
similarity index 74%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
index 7fb3b45..33d4940 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
@@ -16,30 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.udp;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
-import io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.pulsar.io.netty.NettySource;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import io.netty.channel.socket.nio.NioDatagramChannel;
 
 /**
  * Tests for Netty Channel Initializer
  */
-public class NettyChannelInitializerTest {
+public class NettyUDPChannelInitializerTest {
 
     @Test
     public void testChannelInitializer() throws Exception {
-        NioSocketChannel channel = new NioSocketChannel();
+        NioDatagramChannel channel = new NioDatagramChannel();
 
-        NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
-                new NettyServerHandler(new NettySource()));
+        NettyUDPChannelInitializer nettyChannelInitializer = new NettyUDPChannelInitializer(
+                new NettyUDPServerHandler(new NettySource()));
         nettyChannelInitializer.initChannel(channel);
 
         assertNotNull(channel.pipeline().toMap());
-        assertEquals(2, channel.pipeline().toMap().size());
+        assertEquals(1, channel.pipeline().toMap().size());
     }
-
+    
 }