You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/12/14 02:40:48 UTC
[pulsar] branch master updated: [Issue 5827][Issue 5828][netty]
Fixes for UDP protocol support in netty connector (#5829)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b52449a [Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829)
b52449a is described below
commit b52449a6604cbdc4fe70bdfd8ecc9456d6b42ece
Author: gbensa <58...@users.noreply.github.com>
AuthorDate: Sat Dec 14 03:40:39 2019 +0100
[Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829)
### Motivation
UDP protocol is not working for netty connector
### Modifications
Added a specific handler for UDP and use Channel instead of SocketChannel in NettyChannelInitializer
Successfully tested
* Fixes for UDP protocol support in netty connector
* Added specific handlers for UDP and TCP netty connector
---
.../apache/pulsar/io/netty/server/NettyServer.java | 21 +++++++++------
.../NettyTCPChannelInitializer.java} | 14 +++++-----
.../NettyTCPServerHandler.java} | 22 ++++++++--------
.../package-info.java} | 26 +------------------
.../NettyUDPChannelInitializer.java} | 14 +++++-----
.../NettyUDPServerHandler.java} | 30 ++++++++++++----------
.../package-info.java} | 26 +------------------
...tyChannelInitializer.java => package-info.java} | 25 ------------------
.../NettyTCPChannelInitializerTest.java} | 21 +++++++++------
.../NettyUDPChannelInitializerTest.java} | 21 +++++++--------
10 files changed, 79 insertions(+), 141 deletions(-)
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
index 775b6f4..60c9c2d 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -21,6 +21,17 @@ package org.apache.pulsar.io.netty.server;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
+import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPChannelInitializer;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@@ -30,12 +41,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.netty.NettySource;
-import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
-import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Netty Server to accept incoming data via the configured type.
@@ -96,7 +101,7 @@ public class NettyServer {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioDatagramChannel.class);
- bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+ bootstrap.handler(new NettyUDPChannelInitializer(new NettyUDPServerHandler(this.nettySource)))
.option(ChannelOption.SO_BACKLOG, 1024);
ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync();
@@ -105,7 +110,7 @@ public class NettyServer {
private void runTcp() throws InterruptedException {
ServerBootstrap serverBootstrap = getServerBootstrap(
- new NettyChannelInitializer(new NettyServerHandler(this.nettySource)));
+ new NettyTCPChannelInitializer(new NettyTCPServerHandler(this.nettySource)));
ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
channelFuture.channel().closeFuture().sync();
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
similarity index 72%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
index b9a7b4c..1f4972b 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
@@ -16,28 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.tcp;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.Channel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
/**
* Netty Channel Initializer to register decoder and handler.
*/
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyTCPChannelInitializer extends ChannelInitializer<Channel> {
private ChannelInboundHandlerAdapter handler;
- public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+ public NettyTCPChannelInitializer(ChannelInboundHandlerAdapter handler) {
this.handler = handler;
}
@Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ByteArrayDecoder());
- socketChannel.pipeline().addLast(this.handler);
+ protected void initChannel(Channel channel) throws Exception {
+ channel.pipeline().addLast(new ByteArrayDecoder());
+ channel.pipeline().addLast(this.handler);
}
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
similarity index 82%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
index 42f497e..a619bc3 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
@@ -16,38 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.tcp;
import java.io.Serializable;
import java.util.Optional;
-import lombok.Data;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.netty.NettySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.Data;
+
/**
* Handles a server-side channel.
*/
@ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyTCPServerHandler extends SimpleChannelInboundHandler<byte[]> {
- private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(NettyTCPServerHandler.class);
private NettySource nettySource;
- public NettyServerHandler(NettySource nettySource) {
+ public NettyTCPServerHandler(NettySource nettySource) {
this.nettySource = nettySource;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
- nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+ nettySource.consume(new NettyTCPRecord(Optional.ofNullable(""), bytes));
}
@Override
@@ -57,7 +57,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
}
@Data
- static private class NettyRecord implements Record<byte[]>, Serializable {
+ static private class NettyTCPRecord implements Record<byte[]>, Serializable {
private final Optional<String> key;
private final byte[] value;
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
similarity index 50%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
index b9a7b4c..b59d614 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
@@ -16,28 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler.
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private ChannelInboundHandlerAdapter handler;
-
- public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
- this.handler = handler;
- }
-
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ByteArrayDecoder());
- socketChannel.pipeline().addLast(this.handler);
- }
-
-}
+package org.apache.pulsar.io.netty.tcp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
similarity index 68%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
index b9a7b4c..bc86559 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
@@ -16,28 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.udp;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
/**
* Netty Channel Initializer to register decoder and handler.
*/
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyUDPChannelInitializer extends ChannelInitializer<Channel> {
private ChannelInboundHandlerAdapter handler;
- public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+ public NettyUDPChannelInitializer(ChannelInboundHandlerAdapter handler) {
this.handler = handler;
}
@Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ByteArrayDecoder());
- socketChannel.pipeline().addLast(this.handler);
+ protected void initChannel(Channel channel) throws Exception {
+ channel.pipeline().addLast(this.handler);
}
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
similarity index 75%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
index 42f497e..8341628 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
@@ -16,38 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.udp;
import java.io.Serializable;
import java.util.Optional;
-import lombok.Data;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.netty.NettySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.DatagramPacket;
+import lombok.Data;
+
/**
* Handles a server-side channel.
*/
@ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
-
- private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+public class NettyUDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
+ private static final Logger logger = LoggerFactory.getLogger(NettyUDPServerHandler.class);
private NettySource nettySource;
- public NettyServerHandler(NettySource nettySource) {
+ public NettyUDPServerHandler(NettySource nettySource) {
this.nettySource = nettySource;
}
-
+
@Override
- protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
- nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
+ byte[] bytes = ByteBufUtil.getBytes(packet.content());
+ nettySource.consume(new NettyUDPRecord(Optional.ofNullable(""), bytes));
}
@Override
@@ -57,7 +59,7 @@ public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
}
@Data
- static private class NettyRecord implements Record<byte[]>, Serializable {
+ static private class NettyUDPRecord implements Record<byte[]>, Serializable {
private final Optional<String> key;
private final byte[] value;
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
similarity index 50%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
index b9a7b4c..d936c6a 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
@@ -16,28 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler.
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private ChannelInboundHandlerAdapter handler;
-
- public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
- this.handler = handler;
- }
-
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ByteArrayDecoder());
- socketChannel.pipeline().addLast(this.handler);
- }
-
-}
+package org.apache.pulsar.io.netty.udp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/package-info.java
similarity index 50%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
rename to pulsar-io/netty/src/main/java/package-info.java
index b9a7b4c..51da6c0 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/package-info.java
@@ -16,28 +16,3 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
-
-/**
- * Netty Channel Initializer to register decoder and handler.
- */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- private ChannelInboundHandlerAdapter handler;
-
- public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
- this.handler = handler;
- }
-
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ByteArrayDecoder());
- socketChannel.pipeline().addLast(this.handler);
- }
-
-}
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
similarity index 73%
copy from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
copy to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
index 7fb3b45..366c7b1 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
@@ -16,30 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.tcp;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
-import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Tests for Netty Channel Initializer
*/
-public class NettyChannelInitializerTest {
+public class NettyTCPChannelInitializerTest {
@Test
public void testChannelInitializer() throws Exception {
NioSocketChannel channel = new NioSocketChannel();
- NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
- new NettyServerHandler(new NettySource()));
+ NettyTCPChannelInitializer nettyChannelInitializer = new NettyTCPChannelInitializer(
+ new NettyTCPServerHandler(new NettySource()));
nettyChannelInitializer.initChannel(channel);
assertNotNull(channel.pipeline().toMap());
assertEquals(2, channel.pipeline().toMap().size());
}
-
+
}
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
similarity index 74%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
index 7fb3b45..33d4940 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
@@ -16,30 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.udp;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
-import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.pulsar.io.netty.NettySource;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
+import io.netty.channel.socket.nio.NioDatagramChannel;
/**
* Tests for Netty Channel Initializer
*/
-public class NettyChannelInitializerTest {
+public class NettyUDPChannelInitializerTest {
@Test
public void testChannelInitializer() throws Exception {
- NioSocketChannel channel = new NioSocketChannel();
+ NioDatagramChannel channel = new NioDatagramChannel();
- NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
- new NettyServerHandler(new NettySource()));
+ NettyUDPChannelInitializer nettyChannelInitializer = new NettyUDPChannelInitializer(
+ new NettyUDPServerHandler(new NettySource()));
nettyChannelInitializer.initChannel(channel);
assertNotNull(channel.pipeline().toMap());
- assertEquals(2, channel.pipeline().toMap().size());
+ assertEquals(1, channel.pipeline().toMap().size());
}
-
+
}