You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2020/05/19 15:46:16 UTC

[avro] branch AVRO-2806 created (now 527ec67)

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a change to branch AVRO-2806
in repository https://gitbox.apache.org/repos/asf/avro.git.


      at 527ec67  [AVRO-2806] Upgrade from Netty 3 -> Netty 4

This branch includes the following new commits:

     new 527ec67  [AVRO-2806] Upgrade from Netty 3 -> Netty 4

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[avro] 01/01: [AVRO-2806] Upgrade from Netty 3 -> Netty 4

Posted by dk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch AVRO-2806
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 527ec67a8a4f66a1b7f1e9adfeefbe5e6a17f271
Author: Daniel Kulp <dk...@apache.org>
AuthorDate: Tue May 19 11:00:44 2020 -0400

    [AVRO-2806] Upgrade from Netty 3 -> Netty 4
---
 lang/java/grpc/pom.xml                             |   5 -
 lang/java/ipc-netty/pom.xml                        |  22 ++-
 .../org/apache/avro/ipc/netty/NettyServer.java     | 174 +++++++----------
 .../apache/avro/ipc/netty/NettyTransceiver.java    | 210 ++++++++-------------
 .../apache/avro/ipc/netty/NettyTransportCodec.java |  44 ++---
 .../org/apache/avro/ipc/netty/TestNettyServer.java |  58 +++---
 .../netty/TestNettyServerConcurrentExecution.java  |   9 +-
 .../ipc/netty/TestNettyServerWithCallbacks.java    |   5 +-
 .../ipc/netty/TestNettyServerWithCompression.java  |  75 ++------
 .../avro/ipc/netty/TestNettyServerWithSSL.java     | 113 ++++-------
 ...=> TestNettyTransceiverWhenFailsToConnect.java} |  43 ++---
 .../netty/TestNettyTransceiverWhenServerStops.java |   2 +-
 .../apache/avro/ipc/netty/TestProtocolNetty.java   |   2 +-
 .../org/apache/avro/ipc/{ => netty}/servercert.p12 | Bin
 lang/java/pom.xml                                  |  13 +-
 15 files changed, 290 insertions(+), 485 deletions(-)

diff --git a/lang/java/grpc/pom.xml b/lang/java/grpc/pom.xml
index ffe9d27..5a3b7c6 100644
--- a/lang/java/grpc/pom.xml
+++ b/lang/java/grpc/pom.xml
@@ -130,11 +130,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-codec-http2</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>avro</artifactId>
       <version>${project.version}</version>
diff --git a/lang/java/ipc-netty/pom.xml b/lang/java/ipc-netty/pom.xml
index a6df8de..8c1184c 100644
--- a/lang/java/ipc-netty/pom.xml
+++ b/lang/java/ipc-netty/pom.xml
@@ -39,11 +39,10 @@
     <osgi.import>
       !org.apache.avro.ipc.netty*,
       org.apache.avro*;version="${project.version}",
-      org.jboss.netty*,
-      org.apache.velocity*;resolution:=optional,
+      io.netty*,
       *
     </osgi.import>
-    <osgi.export>org.apache.avro.ipc*;version="${project.version}"</osgi.export>
+    <osgi.export>org.apache.avro.ipc.netty*;version="${project.version}"</osgi.export>
   </properties>
 
   <build>
@@ -72,6 +71,17 @@
     </plugins>
   </build>
 
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-bom</artifactId>
+        <version>${netty.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
   <dependencies>
     <dependency>
       <groupId>${project.groupId}</groupId>
@@ -105,7 +115,11 @@
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
     </dependency>
 
 
diff --git a/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyServer.java b/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyServer.java
index a098afb..0c0189e 100644
--- a/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyServer.java
+++ b/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyServer.java
@@ -23,30 +23,26 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.netty.NettyTransportCodec.NettyDataPack;
 import org.apache.avro.ipc.netty.NettyTransportCodec.NettyFrameDecoder;
 import org.apache.avro.ipc.netty.NettyTransportCodec.NettyFrameEncoder;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.execution.ExecutionHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,55 +55,32 @@ public class NettyServer implements Server {
   private final Responder responder;
 
   private final Channel serverChannel;
-  private final ChannelGroup allChannels = new DefaultChannelGroup("avro-netty-server");
-  private final ChannelFactory channelFactory;
+  private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup(10);
+  private final EventLoopGroup callerGroup = new DefaultEventLoopGroup(16);
   private final CountDownLatch closed = new CountDownLatch(1);
+  private final AtomicInteger activeCount = new AtomicInteger(0);
 
-  public NettyServer(Responder responder, InetSocketAddress addr) {
-    this(responder, addr,
-        new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-  }
-
-  public NettyServer(Responder responder, InetSocketAddress addr, ChannelFactory channelFactory) {
-    this(responder, addr, channelFactory, null);
+  public NettyServer(Responder responder, InetSocketAddress addr) throws InterruptedException {
+    this(responder, addr, null);
   }
 
-  /**
-   * @param executionHandler if not null, will be inserted into the Netty
-   *                         pipeline. Use this when your responder does long,
-   *                         non-cpu bound processing (see Netty's
-   *                         ExecutionHandler javadoc).
-   * @param pipelineFactory  Avro-related handlers will be added on top of what
-   *                         this factory creates
-   */
-  public NettyServer(Responder responder, InetSocketAddress addr, ChannelFactory channelFactory,
-      final ChannelPipelineFactory pipelineFactory, final ExecutionHandler executionHandler) {
+  public NettyServer(Responder responder, InetSocketAddress addr, final Consumer<SocketChannel> initializer)
+      throws InterruptedException {
     this.responder = responder;
-    this.channelFactory = channelFactory;
-    ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
-    bootstrap.setPipelineFactory(() -> {
-      ChannelPipeline p = pipelineFactory.getPipeline();
-      p.addLast("frameDecoder", new NettyFrameDecoder());
-      p.addLast("frameEncoder", new NettyFrameEncoder());
-      if (executionHandler != null) {
-        p.addLast("executionHandler", executionHandler);
-      }
-      p.addLast("handler", new NettyServerAvroHandler());
-      return p;
-    });
-    serverChannel = bootstrap.bind(addr);
-    allChannels.add(serverChannel);
-  }
-
-  /**
-   * @param executionHandler if not null, will be inserted into the Netty
-   *                         pipeline. Use this when your responder does long,
-   *                         non-cpu bound processing (see Netty's
-   *                         ExecutionHandler javadoc).
-   */
-  public NettyServer(Responder responder, InetSocketAddress addr, ChannelFactory channelFactory,
-      final ExecutionHandler executionHandler) {
-    this(responder, addr, channelFactory, Channels::pipeline, executionHandler);
+    ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
+          @Override
+          public void initChannel(SocketChannel ch) throws Exception {
+            if (initializer != null) {
+              initializer.accept(ch);
+            }
+            ch.pipeline().addLast("frameDecoder", new NettyFrameDecoder())
+                .addLast("frameEncoder", new NettyFrameEncoder()).addLast("handler", new NettyServerAvroHandler());
+          }
+        }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+    serverChannel = bootstrap.bind(addr).sync().channel();
   }
 
   @Override
@@ -117,15 +90,20 @@ public class NettyServer implements Server {
 
   @Override
   public void close() {
-    ChannelGroupFuture future = allChannels.close();
-    future.awaitUninterruptibly();
-    channelFactory.releaseExternalResources();
+    workerGroup.shutdownGracefully().syncUninterruptibly();
+    bossGroup.shutdownGracefully().syncUninterruptibly();
+    try {
+      serverChannel.closeFuture().sync();
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
     closed.countDown();
   }
 
   @Override
   public int getPort() {
-    return ((InetSocketAddress) serverChannel.getLocalAddress()).getPort();
+    return ((InetSocketAddress) serverChannel.localAddress()).getPort();
   }
 
   @Override
@@ -138,61 +116,53 @@ public class NettyServer implements Server {
    * @return The number of clients currently connected to this server.
    */
   public int getNumActiveConnections() {
-    // allChannels also contains the server channel, so exclude that from the
-    // count.
-    return allChannels.size() - 1;
+    return activeCount.get();
   }
 
   /**
    * Avro server handler for the Netty transport
    */
-  class NettyServerAvroHandler extends SimpleChannelUpstreamHandler {
+  class NettyServerAvroHandler extends SimpleChannelInboundHandler<NettyDataPack> {
 
     private NettyTransceiver connectionMetadata = new NettyTransceiver();
 
     @Override
-    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
-      if (e instanceof ChannelStateEvent) {
-        LOG.info(e.toString());
-      }
-      super.handleUpstream(ctx, e);
-    }
-
-    @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-      allChannels.add(e.getChannel());
-      super.channelOpen(ctx, e);
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+      activeCount.incrementAndGet();
+      super.channelActive(ctx);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-      try {
-        NettyDataPack dataPack = (NettyDataPack) e.getMessage();
-        List<ByteBuffer> req = dataPack.getDatas();
-        List<ByteBuffer> res = responder.respond(req, connectionMetadata);
-        // response will be null for oneway messages.
-        if (res != null) {
-          dataPack.setDatas(res);
-          e.getChannel().write(dataPack);
+    protected void channelRead0(ChannelHandlerContext ctx, final NettyDataPack dataPack) throws Exception {
+      callerGroup.submit(new Runnable() {
+        @Override
+        public void run() {
+          List<ByteBuffer> req = dataPack.getDatas();
+          try {
+            List<ByteBuffer> res = responder.respond(req, connectionMetadata);
+            // response will be null for oneway messages.
+            if (res != null) {
+              dataPack.setDatas(res);
+              ctx.channel().writeAndFlush(dataPack);
+            }
+          } catch (IOException e) {
+            LOG.warn("unexpected error");
+          }
         }
-      } catch (IOException ex) {
-        LOG.warn("unexpected error");
-      }
+      });
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      LOG.warn("Unexpected exception from downstream.", e.getCause());
-      e.getChannel().close();
-      allChannels.remove(e.getChannel());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
+      LOG.warn("Unexpected exception from downstream.", e);
+      ctx.close().syncUninterruptibly();
     }
 
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-      LOG.info("Connection to {} disconnected.", e.getChannel().getRemoteAddress());
-      super.channelClosed(ctx, e);
-      e.getChannel().close();
-      allChannels.remove(e.getChannel());
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      LOG.info("Connection to {} disconnected.", ctx.channel().remoteAddress());
+      activeCount.decrementAndGet();
+      super.channelInactive(ctx);
     }
 
   }
diff --git a/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransceiver.java b/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransceiver.java
index 73e46f1..d3d3150 100644
--- a/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransceiver.java
+++ b/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransceiver.java
@@ -21,16 +21,14 @@ package org.apache.avro.ipc.netty;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 
 import org.apache.avro.Protocol;
 import org.apache.avro.ipc.CallFuture;
@@ -39,22 +37,20 @@ import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.netty.NettyTransportCodec.NettyDataPack;
 import org.apache.avro.ipc.netty.NettyTransportCodec.NettyFrameDecoder;
 import org.apache.avro.ipc.netty.NettyTransportCodec.NettyFrameEncoder;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelState;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +59,7 @@ import org.slf4j.LoggerFactory;
  */
 public class NettyTransceiver extends Transceiver {
   /** If not specified, the default connection timeout will be used (60 sec). */
-  public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000L;
+  public static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000;
   public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis";
   public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
   public static final String NETTY_KEEPALIVE_OPTION = "keepAlive";
@@ -74,10 +70,10 @@ public class NettyTransceiver extends Transceiver {
   private final AtomicInteger serialGenerator = new AtomicInteger(0);
   private final Map<Integer, Callback<List<ByteBuffer>>> requests = new ConcurrentHashMap<>();
 
-  private final ChannelFactory channelFactory;
-  private final long connectTimeoutMillis;
-  private final ClientBootstrap bootstrap;
+  private final Integer connectTimeoutMillis;
+  private final Bootstrap bootstrap;
   private final InetSocketAddress remoteAddr;
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup(new NettyTransceiverThreadFactory("avro"));
 
   volatile ChannelFuture channelFuture;
   volatile boolean stopping;
@@ -92,8 +88,7 @@ public class NettyTransceiver extends Transceiver {
   private Protocol remote; // Synchronized on stateLock
 
   NettyTransceiver() {
-    channelFactory = null;
-    connectTimeoutMillis = 0L;
+    connectTimeoutMillis = 0;
     bootstrap = null;
     remoteAddr = null;
     channelFuture = null;
@@ -120,84 +115,56 @@ public class NettyTransceiver extends Transceiver {
    *                             {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
    * @throws IOException if an error occurs connecting to the given address.
    */
-  public NettyTransceiver(InetSocketAddress addr, Long connectTimeoutMillis) throws IOException {
-    this(addr,
-        new NioClientSocketChannelFactory(
-            Executors.newCachedThreadPool(
-                new NettyTransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-            Executors.newCachedThreadPool(
-                new NettyTransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))),
-        connectTimeoutMillis);
+  public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis) throws IOException {
+    this(addr, connectTimeoutMillis, null);
   }
 
   /**
    * Creates a NettyTransceiver, and attempts to connect to the given address.
-   * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection
-   * timeout.
    * 
-   * @param addr           the address to connect to.
-   * @param channelFactory the factory to use to create a new Netty Channel.
+   * @param addr        the address to connect to.
+   * @param initializer Consumer function to apply initial setup to the
+   *                    SocketChannel. Useablet to set things like SSL
+   *                    requirements, compression, etc...
    * @throws IOException if an error occurs connecting to the given address.
    */
-  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) throws IOException {
-    this(addr, channelFactory, buildDefaultBootstrapOptions(null));
+  public NettyTransceiver(InetSocketAddress addr, final Consumer<SocketChannel> initializer) throws IOException {
+    this(addr, DEFAULT_CONNECTION_TIMEOUT_MILLIS, initializer);
   }
 
   /**
    * Creates a NettyTransceiver, and attempts to connect to the given address.
    * 
    * @param addr                 the address to connect to.
-   * @param channelFactory       the factory to use to create a new Netty Channel.
    * @param connectTimeoutMillis maximum amount of time to wait for connection
    *                             establishment in milliseconds, or null to use
    *                             {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
+   * @param initializer          Consumer function to apply initial setup to the
+   *                             SocketChannel. Useablet to set things like SSL
+   *                             requirements, compression, etc...
    * @throws IOException if an error occurs connecting to the given address.
    */
-  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, Long connectTimeoutMillis)
-      throws IOException {
-    this(addr, channelFactory, buildDefaultBootstrapOptions(connectTimeoutMillis));
-  }
-
-  /**
-   * Creates a NettyTransceiver, and attempts to connect to the given address. It
-   * is strongly recommended that the {@link #NETTY_CONNECT_TIMEOUT_OPTION} option
-   * be set to a reasonable timeout value (a Long value in milliseconds) to
-   * prevent connect/disconnect attempts from hanging indefinitely. It is also
-   * recommended that the {@link #NETTY_TCP_NODELAY_OPTION} option be set to true
-   * to minimize RPC latency.
-   * 
-   * @param addr                        the address to connect to.
-   * @param channelFactory              the factory to use to create a new Netty
-   *                                    Channel.
-   * @param nettyClientBootstrapOptions map of Netty ClientBootstrap options to
-   *                                    use.
-   * @throws IOException          if an error occurs connecting to the given
-   *                              address.
-   * @throws NullPointerException if {@code channelFactory} is {@code null}
-   */
-  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory,
-      Map<String, Object> nettyClientBootstrapOptions) throws IOException {
-    Objects.requireNonNull(channelFactory, "channelFactory cannot be null");
-
+  public NettyTransceiver(InetSocketAddress addr, Integer connectTimeoutMillis,
+      final Consumer<SocketChannel> initializer) throws IOException {
     // Set up.
-    this.channelFactory = channelFactory;
-    this.connectTimeoutMillis = (Long) nettyClientBootstrapOptions.get(NETTY_CONNECT_TIMEOUT_OPTION);
-    bootstrap = new ClientBootstrap(channelFactory);
-    remoteAddr = addr;
-
-    // Configure the event pipeline factory.
-    bootstrap.setPipelineFactory(() -> {
-      ChannelPipeline p = Channels.pipeline();
-      p.addLast("frameDecoder", new NettyFrameDecoder());
-      p.addLast("frameEncoder", new NettyFrameEncoder());
-      p.addLast("handler", createNettyClientAvroHandler());
-      return p;
-    });
-
-    if (nettyClientBootstrapOptions != null) {
-      LOG.debug("Using Netty bootstrap options: {}", nettyClientBootstrapOptions);
-      bootstrap.setOptions(nettyClientBootstrapOptions);
+    if (connectTimeoutMillis == null) {
+      connectTimeoutMillis = DEFAULT_CONNECTION_TIMEOUT_MILLIS;
     }
+    this.connectTimeoutMillis = connectTimeoutMillis;
+    bootstrap = new Bootstrap().group(workerGroup).channel(NioSocketChannel.class)
+        .option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis)
+        .option(ChannelOption.TCP_NODELAY, DEFAULT_TCP_NODELAY_VALUE).handler(new ChannelInitializer<SocketChannel>() {
+          @Override
+          public void initChannel(SocketChannel ch) throws Exception {
+            if (initializer != null) {
+              initializer.accept(ch);
+            }
+            ch.pipeline().addLast("frameDecoder", new NettyFrameDecoder())
+                .addLast("frameEncoder", new NettyFrameEncoder()).addLast("handler", createNettyClientAvroHandler());
+          }
+        });
+
+    remoteAddr = addr;
 
     // Make a new connection.
     stateLock.readLock().lock();
@@ -206,7 +173,7 @@ public class NettyTransceiver extends Transceiver {
     } catch (Throwable e) {
       // must attempt to clean up any allocated channel future
       if (channelFuture != null) {
-        channelFuture.getChannel().close();
+        channelFuture.channel().close();
       }
 
       if (e instanceof IOException)
@@ -226,33 +193,17 @@ public class NettyTransceiver extends Transceiver {
    * 
    * @return the ChannelUpstreamHandler to use.
    */
-  protected ChannelUpstreamHandler createNettyClientAvroHandler() {
+  protected ChannelInboundHandler createNettyClientAvroHandler() {
     return new NettyClientAvroHandler();
   }
 
   /**
-   * Creates the default options map for the Netty ClientBootstrap.
-   * 
-   * @param connectTimeoutMillis connection timeout in milliseconds, or null if no
-   *                             timeout is desired.
-   * @return the map of Netty bootstrap options.
-   */
-  protected static Map<String, Object> buildDefaultBootstrapOptions(Long connectTimeoutMillis) {
-    Map<String, Object> options = new HashMap<>(3);
-    options.put(NETTY_TCP_NODELAY_OPTION, DEFAULT_TCP_NODELAY_VALUE);
-    options.put(NETTY_KEEPALIVE_OPTION, true);
-    options.put(NETTY_CONNECT_TIMEOUT_OPTION,
-        connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : connectTimeoutMillis);
-    return options;
-  }
-
-  /**
    * Tests whether the given channel is ready for writing.
    * 
    * @return true if the channel is open and ready; false otherwise.
    */
   private static boolean isChannelReady(Channel channel) {
-    return (channel != null) && channel.isOpen() && channel.isBound() && channel.isConnected();
+    return (channel != null) && channel.isOpen() && channel.isActive();
   }
 
   /**
@@ -287,9 +238,10 @@ public class NettyTransceiver extends Transceiver {
 
             synchronized (channelFutureLock) {
               if (!channelFuture.isSuccess()) {
-                throw new IOException("Error connecting to " + remoteAddr, channelFuture.getCause());
+                remote = null;
+                throw new IOException("Error connecting to " + remoteAddr, channelFuture.cause());
               }
-              channel = channelFuture.getChannel();
+              channel = channelFuture.channel();
               channelFuture = null;
             }
           }
@@ -326,7 +278,7 @@ public class NettyTransceiver extends Transceiver {
       }
     }
     if (channelFutureToCancel != null) {
-      channelFutureToCancel.cancel();
+      channelFutureToCancel.cancel(true);
     }
 
     if (stateReadLockHeld) {
@@ -420,7 +372,7 @@ public class NettyTransceiver extends Transceiver {
       disconnect(awaitCompletion, true, null);
     } finally {
       // Shut down all thread pools to exit.
-      channelFactory.releaseExternalResources();
+      workerGroup.shutdownGracefully();
     }
   }
 
@@ -428,7 +380,7 @@ public class NettyTransceiver extends Transceiver {
   public String getRemoteName() throws IOException {
     stateLock.readLock().lock();
     try {
-      return getChannel().getRemoteAddress().toString();
+      return getChannel().remoteAddress().toString();
     } finally {
       stateLock.readLock().unlock();
     }
@@ -481,7 +433,7 @@ public class NettyTransceiver extends Transceiver {
       }
     }
     if (!writeFuture.isSuccess()) {
-      throw new IOException("Error writing buffers", writeFuture.getCause());
+      throw new IOException("Error writing buffers", writeFuture.cause());
     }
   }
 
@@ -494,7 +446,7 @@ public class NettyTransceiver extends Transceiver {
    * @throws IOException if an error occurs connecting to the remote peer.
    */
   private ChannelFuture writeDataPack(NettyDataPack dataPack) throws IOException {
-    return getChannel().write(dataPack);
+    return getChannel().writeAndFlush(dataPack);
   }
 
   @Override
@@ -552,7 +504,7 @@ public class NettyTransceiver extends Transceiver {
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
       if (!future.isSuccess() && (callback != null)) {
-        callback.handleError(new IOException("Error writing buffers", future.getCause()));
+        callback.handleError(new IOException("Error writing buffers", future.cause()));
       }
     }
   }
@@ -560,31 +512,10 @@ public class NettyTransceiver extends Transceiver {
   /**
    * Avro client handler for the Netty transport
    */
-  protected class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
-
-    @Override
-    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
-      if (e instanceof ChannelStateEvent) {
-        LOG.debug("{}", e);
-        ChannelStateEvent cse = (ChannelStateEvent) e;
-        if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
-          // Server closed connection; disconnect client side
-          LOG.debug("Remote peer {} closed connection", remoteAddr);
-          disconnect(false, true, null);
-        }
-      }
-      super.handleUpstream(ctx, e);
-    }
-
-    @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-      // channel = e.getChannel();
-      super.channelOpen(ctx, e);
-    }
+  protected class NettyClientAvroHandler extends SimpleChannelInboundHandler<NettyDataPack> {
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
-      NettyDataPack dataPack = (NettyDataPack) e.getMessage();
+    protected void channelRead0(ChannelHandlerContext ctx, NettyDataPack dataPack) throws Exception {
       Callback<List<ByteBuffer>> callback = requests.get(dataPack.getSerial());
       if (callback == null) {
         throw new RuntimeException("Missing previous call info");
@@ -597,8 +528,17 @@ public class NettyTransceiver extends Transceiver {
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      disconnect(false, true, e.getCause());
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
+      disconnect(false, true, e);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      if (!ctx.channel().isOpen()) {
+        LOG.info("Connection to {} disconnected.", ctx.channel().remoteAddress());
+        disconnect(false, true, null);
+      }
+      super.channelInactive(ctx);
     }
 
   }
diff --git a/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransportCodec.java b/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransportCodec.java
index cd006a7..c293808 100644
--- a/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransportCodec.java
+++ b/lang/java/ipc-netty/src/main/java/org/apache/avro/ipc/netty/NettyTransportCodec.java
@@ -18,17 +18,17 @@
 
 package org.apache.avro.ipc.netty;
 
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.avro.AvroRuntimeException;
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
 
 /**
  * Data structure, encoder and decoder classes for the Netty transport.
@@ -71,7 +71,7 @@ public class NettyTransportCodec {
    * Protocol encoder which converts NettyDataPack which contains the Responder's
    * output List&lt;ByteBuffer&gt; to ChannelBuffer needed by Netty.
    */
-  public static class NettyFrameEncoder extends OneToOneEncoder {
+  public static class NettyFrameEncoder extends MessageToMessageEncoder<NettyDataPack> {
 
     /**
      * encode msg to ChannelBuffer
@@ -81,8 +81,7 @@ public class NettyTransportCodec {
      * @return encoded ChannelBuffer
      */
     @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
-      NettyDataPack dataPack = (NettyDataPack) msg;
+    protected void encode(ChannelHandlerContext ctx, NettyDataPack dataPack, List<Object> out) throws Exception {
       List<ByteBuffer> origs = dataPack.getDatas();
       List<ByteBuffer> bbs = new ArrayList<>(origs.size() * 2 + 1);
       bbs.add(getPackHeader(dataPack)); // prepend a pack header including serial number and list size
@@ -90,8 +89,7 @@ public class NettyTransportCodec {
         bbs.add(getLengthHeader(b)); // for each buffer prepend length field
         bbs.add(b);
       }
-
-      return ChannelBuffers.wrappedBuffer(bbs.toArray(new ByteBuffer[0]));
+      out.add(wrappedBuffer(bbs.toArray(new ByteBuffer[0])));
     }
 
     private ByteBuffer getPackHeader(NettyDataPack dataPack) {
@@ -114,7 +112,7 @@ public class NettyTransportCodec {
    * Protocol decoder which converts Netty's ChannelBuffer to NettyDataPack which
    * contains a List&lt;ByteBuffer&gt; needed by Avro Responder.
    */
-  public static class NettyFrameDecoder extends FrameDecoder {
+  public static class NettyFrameDecoder extends ByteToMessageDecoder {
     private boolean packHeaderRead = false;
     private int listSize;
     private NettyDataPack dataPack;
@@ -129,26 +127,23 @@ public class NettyTransportCodec {
      * decode buffer to NettyDataPack
      */
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
-
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+      if (!in.isReadable()) {
+        return;
+      }
       if (!packHeaderRead) {
-        if (decodePackHeader(ctx, channel, buffer)) {
+        if (decodePackHeader(ctx, in)) {
           packHeaderRead = true;
         }
-        return null;
       } else {
-        if (decodePackBody(ctx, channel, buffer)) {
+        if (decodePackBody(ctx, in)) {
           packHeaderRead = false; // reset state
-          return dataPack;
-        } else {
-          return null;
+          out.add(dataPack);
         }
       }
-
     }
 
-    private boolean decodePackHeader(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
-        throws Exception {
+    private boolean decodePackHeader(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
       if (buffer.readableBytes() < 8) {
         return false;
       }
@@ -159,7 +154,6 @@ public class NettyTransportCodec {
       // Sanity check to reduce likelihood of invalid requests being honored.
       // Only allow 10% of available memory to go towards this list (too much!)
       if (listSize * SIZEOF_REF > 0.1 * maxMem) {
-        channel.close().await();
         throw new AvroRuntimeException(
             "Excessively large list allocation " + "request detected: " + listSize + " items! Connection closed.");
       }
@@ -170,7 +164,7 @@ public class NettyTransportCodec {
       return true;
     }
 
-    private boolean decodePackBody(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+    private boolean decodePackBody(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
       if (buffer.readableBytes() < 4) {
         return false;
       }
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java
index 35a4084..1ec9313 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java
@@ -22,11 +22,14 @@ import java.io.IOException;
 import java.io.OutputStream;
 import static org.junit.Assert.assertEquals;
 
+import io.netty.channel.socket.SocketChannel;
+
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import org.junit.Assert;
 import org.apache.avro.ipc.Responder;
@@ -41,11 +44,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestNettyServer {
-  static final long CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
-  private static Server server;
-  private static Transceiver transceiver;
-  private static Mail proxy;
-  private static MailImpl mailService;
+  static final int CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
+  protected static Server server;
+  protected static Transceiver transceiver;
+  protected static Mail proxy;
+  protected static MailImpl mailService;
+  protected static Consumer<SocketChannel> channelInitializer;
 
   public static class MailImpl implements Mail {
 
@@ -74,28 +78,30 @@ public class TestNettyServer {
     }
   }
 
-  @BeforeClass
-  public static void initializeConnections() throws Exception {
-    // start server
+  public static void initializeConnections(Consumer<SocketChannel> initializer) throws Exception {
+    initializeConnections(initializer, initializer);
+  }
+
+  public static void initializeConnections(Consumer<SocketChannel> serverInitializer,
+      Consumer<SocketChannel> transceiverInitializer) throws Exception {
     System.out.println("starting server...");
+    channelInitializer = transceiverInitializer;
     mailService = new MailImpl();
     Responder responder = new SpecificResponder(Mail.class, mailService);
-    server = initializeServer(responder);
+    server = new NettyServer(responder, new InetSocketAddress(0), serverInitializer);
     server.start();
 
     int serverPort = server.getPort();
     System.out.println("server port : " + serverPort);
 
-    transceiver = initializeTransceiver(serverPort);
+    transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), CONNECT_TIMEOUT_MILLIS,
+        transceiverInitializer);
     proxy = SpecificRequestor.getClient(Mail.class, transceiver);
   }
 
-  protected static Server initializeServer(Responder responder) {
-    return new NettyServer(responder, new InetSocketAddress(0));
-  }
-
-  protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
-    return new NettyTransceiver(new InetSocketAddress(serverPort), CONNECT_TIMEOUT_MILLIS);
+  @BeforeClass
+  public static void initializeConnections() throws Exception {
+    initializeConnections(null);
   }
 
   @AfterClass
@@ -139,7 +145,8 @@ public class TestNettyServer {
 
   @Test
   public void testConnectionsCount() throws Exception {
-    Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(server.getPort()), CONNECT_TIMEOUT_MILLIS);
+    Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(server.getPort()), CONNECT_TIMEOUT_MILLIS,
+        channelInitializer);
     Mail proxy2 = SpecificRequestor.getClient(Mail.class, transceiver2);
     proxy.fireandforget(createMessage());
     proxy2.fireandforget(createMessage());
@@ -168,14 +175,15 @@ public class TestNettyServer {
     int port = server.getPort();
     String msg = "GET /status HTTP/1.1\n\n";
     InetSocketAddress sockAddr = new InetSocketAddress("127.0.0.1", port);
-    Socket sock = new Socket();
-    sock.connect(sockAddr);
-    OutputStream out = sock.getOutputStream();
-    out.write(msg.getBytes(StandardCharsets.UTF_8));
-    out.flush();
-    byte[] buf = new byte[2048];
-    int bytesRead = sock.getInputStream().read(buf);
-    Assert.assertTrue("Connection should have been closed", bytesRead == -1);
+    try (Socket sock = new Socket()) {
+      sock.connect(sockAddr);
+      OutputStream out = sock.getOutputStream();
+      out.write(msg.getBytes(StandardCharsets.UTF_8));
+      out.flush();
+      byte[] buf = new byte[2048];
+      int bytesRead = sock.getInputStream().read(buf);
+      Assert.assertTrue("Connection should have been closed: " + bytesRead, bytesRead == -1);
+    }
   }
 
 }
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerConcurrentExecution.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerConcurrentExecution.java
index e3cb5a1..e5102f1 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerConcurrentExecution.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerConcurrentExecution.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.Transceiver;
@@ -31,8 +30,6 @@ import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.avro.test.Simple;
 import org.apache.avro.test.TestError;
 import org.apache.avro.test.TestRecord;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.execution.ExecutionHandler;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -81,9 +78,7 @@ public class TestNettyServerConcurrentExecution {
   @Test(timeout = 30000)
   public void test() throws Exception {
     final CountDownLatch waitLatch = new CountDownLatch(1);
-    server = new NettyServer(new SpecificResponder(Simple.class, new SimpleImpl(waitLatch)), new InetSocketAddress(0),
-        new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()),
-        new ExecutionHandler(Executors.newCachedThreadPool()));
+    server = new NettyServer(new SpecificResponder(Simple.class, new SimpleImpl(waitLatch)), new InetSocketAddress(0));
     server.start();
 
     transceiver = new NettyTransceiver(new InetSocketAddress(server.getPort()), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
@@ -122,6 +117,7 @@ public class TestNettyServerConcurrentExecution {
 
     // 4. If control reaches here, both RPCs have executed concurrently
     Assert.assertEquals("wait", response);
+    Thread.sleep(2000);
   }
 
   /**
@@ -146,6 +142,7 @@ public class TestNettyServerConcurrentExecution {
     @Override
     public int add(int arg1, int arg2) {
       // Step 1:
+      System.out.println("Adding " + arg1 + "+" + arg2);
       return arg1 + arg2;
     }
 
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCallbacks.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCallbacks.java
index a3f673c..8ed6a15 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCallbacks.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCallbacks.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.avro.AvroRemoteException;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.ipc.CallFuture;
 import org.apache.avro.ipc.Callback;
@@ -212,9 +211,6 @@ public class TestNettyServerWithCallbacks {
       Assert.fail("Expected " + TestError.class.getCanonicalName());
     } catch (TestError e) {
       // Expected
-    } catch (AvroRemoteException e) {
-      e.printStackTrace();
-      Assert.fail("Unexpected error: " + e.toString());
     }
 
     // Test asynchronous RPC (future):
@@ -404,6 +400,7 @@ public class TestNettyServerWithCallbacks {
           simpleClient2.add(3, 4);
           Assert.fail("Expected an exception");
         } catch (Exception e) {
+          e.printStackTrace();
           // expected
         }
       });
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCompression.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCompression.java
index e2e2dbd..1de68ec 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCompression.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithCompression.java
@@ -18,69 +18,28 @@
 package org.apache.avro.ipc.netty;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
 
-import org.apache.avro.ipc.Responder;
-import org.apache.avro.ipc.Server;
-import org.apache.avro.ipc.Transceiver;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.compression.ZlibDecoder;
-import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
 
-public class TestNettyServerWithCompression extends TestNettyServer {
-
-  protected static Server initializeServer(Responder responder) {
-    ChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-        Executors.newCachedThreadPool());
-    return new NettyServer(responder, new InetSocketAddress(0), channelFactory, new CompressionChannelPipelineFactory(),
-        null);
-  }
+import io.netty.handler.codec.compression.JdkZlibDecoder;
+import io.netty.handler.codec.compression.JdkZlibEncoder;
 
-  protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
-    return new NettyTransceiver(new InetSocketAddress(serverPort), new CompressionChannelFactory(),
-        CONNECT_TIMEOUT_MILLIS);
-  }
-
-  /**
-   * Factory of Compression-enabled client channels
-   */
-  private static class CompressionChannelFactory extends NioClientSocketChannelFactory {
-    public CompressionChannelFactory() {
-      super(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-    }
+public class TestNettyServerWithCompression extends TestNettyServer {
 
-    @Override
-    public SocketChannel newChannel(ChannelPipeline pipeline) {
-      try {
-        ZlibEncoder encoder = new ZlibEncoder(6);
-        pipeline.addFirst("deflater", encoder);
-        pipeline.addFirst("inflater", new ZlibDecoder());
-        return super.newChannel(pipeline);
-      } catch (Exception ex) {
-        throw new RuntimeException("Cannot create Compression channel", ex);
-      }
-    }
+  @BeforeClass
+  public static void initializeConnections() throws Exception {
+    initializeConnections(ch -> {
+      ch.pipeline().addFirst("deflater", new JdkZlibEncoder(6));
+      ch.pipeline().addFirst("inflater", new JdkZlibDecoder());
+    });
   }
 
-  /**
-   * Factory of Compression-enabled server worker channel pipelines
-   */
-  private static class CompressionChannelPipelineFactory implements ChannelPipelineFactory {
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      ZlibEncoder encoder = new ZlibEncoder(6);
-      pipeline.addFirst("deflater", encoder);
-      pipeline.addFirst("inflater", new ZlibDecoder());
-      return pipeline;
-    }
+  @Ignore
+  @Override
+  public void testBadRequest() throws IOException {
+    // this tests in the base class needs to be skipped
+    // as the decompression/compression algorithms will write the gzip header out
+    // prior to the stream closing so the stream is not completely empty
   }
 }
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithSSL.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithSSL.java
index c5f744f..2e9651b 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithSSL.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServerWithSSL.java
@@ -18,65 +18,43 @@
 
 package org.apache.avro.ipc.netty;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.security.KeyStore;
 import java.security.Security;
 import java.security.cert.X509Certificate;
-import java.util.concurrent.Executors;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
-import org.apache.avro.ipc.Responder;
-import org.apache.avro.ipc.Server;
-import org.apache.avro.ipc.Transceiver;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.ssl.SslHandler;
+import org.junit.BeforeClass;
+
+import io.netty.handler.ssl.SslHandler;
 
 public class TestNettyServerWithSSL extends TestNettyServer {
   public static final String TEST_CERTIFICATE = "servercert.p12";
   public static final String TEST_CERTIFICATE_PASSWORD = "s3cret";
 
-  protected static Server initializeServer(Responder responder) {
-    ChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-        Executors.newCachedThreadPool());
-    return new NettyServer(responder, new InetSocketAddress(0), channelFactory, new SSLChannelPipelineFactory(), null);
-  }
-
-  protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
-    return new NettyTransceiver(new InetSocketAddress(serverPort), new SSLChannelFactory(), CONNECT_TIMEOUT_MILLIS);
-  }
-
-  /**
-   * Factory of SSL-enabled client channels
-   */
-  private static class SSLChannelFactory extends NioClientSocketChannelFactory {
-    public SSLChannelFactory() {
-      super(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-    }
-
-    @Override
-    public SocketChannel newChannel(ChannelPipeline pipeline) {
+  @BeforeClass
+  public static void initializeConnections() throws Exception {
+    initializeConnections(ch -> {
+      SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
+      sslEngine.setUseClientMode(false);
+      SslHandler handler = new SslHandler(sslEngine, false);
+      ch.pipeline().addLast("SSL", handler);
+    }, ch -> {
       try {
         SSLContext sslContext = SSLContext.getInstance("TLS");
         sslContext.init(null, new TrustManager[] { new BogusTrustManager() }, null);
         SSLEngine sslEngine = sslContext.createSSLEngine();
         sslEngine.setUseClientMode(true);
-        pipeline.addFirst("ssl", new SslHandler(sslEngine));
-        return super.newChannel(pipeline);
-      } catch (Exception ex) {
-        throw new RuntimeException("Cannot create SSL channel", ex);
+
+        SslHandler handler = new SslHandler(sslEngine, false);
+        ch.pipeline().addLast("SSL", handler);
+      } catch (Exception e) {
+        e.printStackTrace();
       }
-    }
+    });
   }
 
   /**
@@ -99,44 +77,29 @@ public class TestNettyServerWithSSL extends TestNettyServer {
     }
   }
 
-  /**
-   * Factory of SSL-enabled server worker channel pipelines
-   */
-  private static class SSLChannelPipelineFactory implements ChannelPipelineFactory {
-
-    private SSLContext createServerSSLContext() {
-      try {
-        KeyStore ks = KeyStore.getInstance("PKCS12");
-        ks.load(TestNettyServer.class.getResource(TEST_CERTIFICATE).openStream(),
-            TEST_CERTIFICATE_PASSWORD.toCharArray());
-
-        // Set up key manager factory to use our key store
-        KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
-        kmf.init(ks, TEST_CERTIFICATE_PASSWORD.toCharArray());
-
-        SSLContext serverContext = SSLContext.getInstance("TLS");
-        serverContext.init(kmf.getKeyManagers(), null, null);
-        return serverContext;
-      } catch (Exception e) {
-        throw new Error("Failed to initialize the server-side SSLContext", e);
-      }
-    }
-
-    private String getAlgorithm() {
-      String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
-      if (algorithm == null) {
-        algorithm = "SunX509";
-      }
-      return algorithm;
+  private static SSLContext createServerSSLContext() {
+    try {
+      KeyStore ks = KeyStore.getInstance("PKCS12");
+      ks.load(TestNettyServer.class.getResource(TEST_CERTIFICATE).openStream(),
+          TEST_CERTIFICATE_PASSWORD.toCharArray());
+
+      // Set up key manager factory to use our key store
+      KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
+      kmf.init(ks, TEST_CERTIFICATE_PASSWORD.toCharArray());
+
+      SSLContext serverContext = SSLContext.getInstance("TLS");
+      serverContext.init(kmf.getKeyManagers(), null, null);
+      return serverContext;
+    } catch (Exception e) {
+      throw new Error("Failed to initialize the server-side SSLContext", e);
     }
+  }
 
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
-      SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
-      sslEngine.setUseClientMode(false);
-      pipeline.addLast("ssl", new SslHandler(sslEngine));
-      return pipeline;
+  private static String getAlgorithm() {
+    String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
+    if (algorithm == null) {
+      algorithm = "SunX509";
     }
+    return algorithm;
   }
 }
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/NettyTransceiverWhenFailsToConnect.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyTransceiverWhenFailsToConnect.java
similarity index 50%
rename from lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/NettyTransceiverWhenFailsToConnect.java
rename to lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyTransceiverWhenFailsToConnect.java
index 76b59b7..53b9b2d 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/NettyTransceiverWhenFailsToConnect.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyTransceiverWhenFailsToConnect.java
@@ -17,55 +17,34 @@
  */
 package org.apache.avro.ipc.netty;
 
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.apache.avro.ipc.Transceiver;
+
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 
-import static org.junit.Assert.assertFalse;
+import io.netty.channel.socket.SocketChannel;
 
 /**
  * This is a very specific test that verifies that if the NettyTransceiver fails
  * to connect it cleans up the netty channel that it has created.
  */
-public class NettyTransceiverWhenFailsToConnect {
+public class TestNettyTransceiverWhenFailsToConnect {
+  SocketChannel channel = null;
 
   @Test(expected = IOException.class)
   public void testNettyTransceiverReleasesNettyChannelOnFailingToConnect() throws Exception {
-
-    LastChannelRememberingChannelFactory socketChannelFactory = null;
     try (ServerSocket serverSocket = new ServerSocket(0)) {
-      socketChannelFactory = new LastChannelRememberingChannelFactory();
-
-      try {
-        new NettyTransceiver(new InetSocketAddress(serverSocket.getLocalPort()), socketChannelFactory, 1L);
-      } finally {
-        assertFalse("expected that the channel opened by the transceiver is closed",
-            socketChannelFactory.lastChannel.isOpen());
+      try (Transceiver t = new NettyTransceiver(new InetSocketAddress(serverSocket.getLocalPort()), 1, c -> {
+        channel = c;
+      })) {
+        Assert.fail("should have thrown an exception");
       }
     } finally {
-
-      // closing the server socket will actually free up the open channel in the
-      // transceiver, which would have hung otherwise (pre AVRO-1407)
-
-      if (socketChannelFactory != null) {
-        socketChannelFactory.releaseExternalResources();
-      }
-    }
-  }
-
-  class LastChannelRememberingChannelFactory extends NioClientSocketChannelFactory implements ChannelFactory {
-
-    volatile SocketChannel lastChannel;
-
-    @Override
-    public SocketChannel newChannel(ChannelPipeline pipeline) {
-      return lastChannel = super.newChannel(pipeline);
+      Assert.assertTrue("Channel not shut down", channel.isShutdown());
     }
   }
 }
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyTransceiverWhenServerStops.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyTransceiverWhenServerStops.java
index 853afcb..a659f5a 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyTransceiverWhenServerStops.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyTransceiverWhenServerStops.java
@@ -41,7 +41,7 @@ public class TestNettyTransceiverWhenServerStops {
 
     int serverPort = server.getPort();
 
-    final NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), 60000L);
+    final NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), 60000);
     final Mail mail = SpecificRequestor.getClient(Mail.class, transceiver);
 
     final AtomicInteger successes = new AtomicInteger();
diff --git a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestProtocolNetty.java b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestProtocolNetty.java
index 25eaeb1..383625d 100644
--- a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestProtocolNetty.java
+++ b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestProtocolNetty.java
@@ -36,7 +36,7 @@ public class TestProtocolNetty extends TestProtocolSpecific {
 
   @Override
   public Transceiver createTransceiver() throws Exception {
-    return new NettyTransceiver(new InetSocketAddress(server.getPort()), 2000L);
+    return new NettyTransceiver(new InetSocketAddress(server.getPort()), 2000);
   }
 
   @Override
diff --git a/lang/java/ipc-netty/src/test/resources/org/apache/avro/ipc/servercert.p12 b/lang/java/ipc-netty/src/test/resources/org/apache/avro/ipc/netty/servercert.p12
similarity index 100%
rename from lang/java/ipc-netty/src/test/resources/org/apache/avro/ipc/servercert.p12
rename to lang/java/ipc-netty/src/test/resources/org/apache/avro/ipc/netty/servercert.p12
diff --git a/lang/java/pom.xml b/lang/java/pom.xml
index 3a07441..a4a3f96 100644
--- a/lang/java/pom.xml
+++ b/lang/java/pom.xml
@@ -46,7 +46,7 @@
     <jetty.version>9.4.28.v20200408</jetty.version>
     <jopt-simple.version>5.0.4</jopt-simple.version>
     <junit.version>4.12</junit.version>
-    <netty.version>3.10.6.Final</netty.version>
+    <netty.version>4.1.50.Final</netty.version>
     <protobuf.version>3.11.1</protobuf.version>
     <thrift.version>0.12.0</thrift.version>
     <slf4j.version>1.7.30</slf4j.version>
@@ -61,7 +61,6 @@
     <easymock.version>4.0.2</easymock.version>
     <hamcrest.version>2.1</hamcrest.version>
     <grpc.version>1.28.0</grpc.version>
-    <netty-codec-http2.version>4.1.45.Final</netty-codec-http2.version>
     <zstd-jni.version>1.4.4-11</zstd-jni.version>
     <!-- version properties for plugins -->
     <archetype-plugin.version>3.0.1</archetype-plugin.version>
@@ -532,11 +531,6 @@
           <version>${servlet-api.version}</version>
       </dependency>
       <dependency>
-        <groupId>io.netty</groupId>
-        <artifactId>netty</artifactId>
-        <version>${netty.version}</version>
-      </dependency>
-      <dependency>
         <groupId>net.sf.jopt-simple</groupId>
         <artifactId>jopt-simple</artifactId>
         <version>${jopt-simple.version}</version>
@@ -589,11 +583,6 @@
         <version>${grpc.version}</version>
       </dependency>
       <dependency>
-        <groupId>io.netty</groupId>
-        <artifactId>netty-codec-http2</artifactId>
-        <version>${netty-codec-http2.version}</version>
-      </dependency>
-      <dependency>
         <groupId>org.tukaani</groupId>
         <artifactId>xz</artifactId>
         <version>${tukaani.version}</version>