You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/03/26 09:40:36 UTC

[bookkeeper] branch master updated: Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a36f1ab  Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder
a36f1ab is described below

commit a36f1ab88034d6b1bdfe75e725886c2e8cca1e85
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon Mar 26 11:40:26 2018 +0200

    Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder
    
    This change is mostly a clean up/refactor which drops intermediate MessageToMessageEncoder and MessageToMessageDecoder from BookieProtoEncoding
    
    Author: Enrico Olivelli <eo...@apache.org>
    Author: Enrico Olivelli <eo...@gmail.com>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Jia Zhai <None>
    
    This closes #1286 from eolivelli/fix/java9-response-corrupted
---
 .../bookkeeper/proto/BookieProtoEncoding.java      | 105 +++++++++++----------
 .../bookkeeper/proto/BookieProtoEncodingTest.java  |  20 ++--
 2 files changed, 66 insertions(+), 59 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index a45f94e..151a799 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -32,12 +32,13 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
-import java.util.List;
 
 import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
 import org.apache.bookkeeper.proto.checksum.MacDigestManager;
@@ -371,7 +372,7 @@ public class BookieProtoEncoding {
      * A request message encoder.
      */
     @Sharable
-    public static class RequestEncoder extends MessageToMessageEncoder<Object> {
+    public static class RequestEncoder extends ChannelOutboundHandlerAdapter {
 
         final EnDecoder reqPreV3;
         final EnDecoder reqV3;
@@ -382,17 +383,17 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Encode request {} to channel {}.", msg, ctx.channel());
             }
             if (msg instanceof BookkeeperProtocol.Request) {
-                out.add(reqV3.encode(msg, ctx.alloc()));
+                ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Request) {
-                out.add(reqPreV3.encode(msg, ctx.alloc()));
+                ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
             } else {
                 LOG.error("Invalid request to encode to {}: {}", ctx.channel(), msg.getClass().getName());
-                out.add(msg);
+                ctx.write(msg, promise);
             }
         }
     }
@@ -401,7 +402,7 @@ public class BookieProtoEncoding {
      * A request message decoder.
      */
     @Sharable
-    public static class RequestDecoder extends MessageToMessageDecoder<Object> {
+    public static class RequestDecoder extends ChannelInboundHandlerAdapter {
         final EnDecoder reqPreV3;
         final EnDecoder reqV3;
         boolean usingV3Protocol;
@@ -413,27 +414,33 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Received request {} from channel {} to decode.", msg, ctx.channel());
             }
-            if (!(msg instanceof ByteBuf)) {
-                out.add(msg);
-                return;
-            }
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.markReaderIndex();
-
-            if (usingV3Protocol) {
-                try {
-                    out.add(reqV3.decode(buffer));
-                } catch (InvalidProtocolBufferException e) {
-                    usingV3Protocol = false;
-                    buffer.resetReaderIndex();
-                    out.add(reqPreV3.decode(buffer));
+            try {
+                if (!(msg instanceof ByteBuf)) {
+                    LOG.error("Received invalid request {} from channel {} to decode.", msg, ctx.channel());
+                    ctx.fireChannelRead(msg);
+                    return;
                 }
-            } else {
-                out.add(reqPreV3.decode(buffer));
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.markReaderIndex();
+                Object result;
+                if (usingV3Protocol) {
+                    try {
+                        result = reqV3.decode(buffer);
+                    } catch (InvalidProtocolBufferException e) {
+                        usingV3Protocol = false;
+                        buffer.resetReaderIndex();
+                        result = reqPreV3.decode(buffer);
+                    }
+                } else {
+                    result = reqPreV3.decode(buffer);
+                }
+                ctx.fireChannelRead(result);
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
     }
@@ -442,7 +449,7 @@ public class BookieProtoEncoding {
      * A response message encoder.
      */
     @Sharable
-    public static class ResponseEncoder extends MessageToMessageEncoder<Object> {
+    public static class ResponseEncoder extends ChannelOutboundHandlerAdapter {
         final EnDecoder repPreV3;
         final EnDecoder repV3;
 
@@ -452,18 +459,17 @@ public class BookieProtoEncoding {
         }
 
         @Override
-        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out)
-                throws Exception {
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
             }
             if (msg instanceof BookkeeperProtocol.Response) {
-                out.add(repV3.encode(msg, ctx.alloc()));
+                ctx.write(repV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Response) {
-                out.add(repPreV3.encode(msg, ctx.alloc()));
+                ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
             } else {
                 LOG.error("Invalid response to encode to {}: {}", ctx.channel(), msg.getClass().getName());
-                out.add(msg);
+                ctx.write(msg, promise);
             }
         }
     }
@@ -472,33 +478,30 @@ public class BookieProtoEncoding {
      * A response message decoder.
      */
     @Sharable
-    public static class ResponseDecoder extends MessageToMessageDecoder<Object> {
-        final EnDecoder repPreV3;
-        final EnDecoder repV3;
-        boolean usingV2Protocol;
+    public static class ResponseDecoder extends ChannelInboundHandlerAdapter {
+        final EnDecoder rep;
 
         ResponseDecoder(ExtensionRegistry extensionRegistry, boolean useV2Protocol) {
-            repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
-            repV3 = new ResponseEnDecoderV3(extensionRegistry);
-            usingV2Protocol = useV2Protocol;
+            rep = useV2Protocol
+                    ? new ResponseEnDeCoderPreV3(extensionRegistry) : new ResponseEnDecoderV3(extensionRegistry);
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Received response {} from channel {} to decode.", msg, ctx.channel());
             }
-            if (!(msg instanceof ByteBuf)) {
-                out.add(msg);
-            }
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.markReaderIndex();
-
-            if (!usingV2Protocol) {
-                out.add(repV3.decode(buffer));
-            } else {
-                // If in the same connection we already got preV3 messages, don't try again to decode V3 messages
-                out.add(repPreV3.decode(buffer));
+            try {
+                if (!(msg instanceof ByteBuf)) {
+                    LOG.error("Received invalid response {} from channel {} to decode.", msg, ctx.channel());
+                    ctx.fireChannelRead(msg);
+                    return;
+                }
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.markReaderIndex();
+                ctx.fireChannelRead(rep.decode(buffer));
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index f182371..6f9cc9a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -21,7 +21,9 @@ package org.apache.bookkeeper.proto;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
@@ -71,16 +73,19 @@ public class BookieProtoEncodingTest {
             .build();
 
         List<Object> outList = Lists.newArrayList();
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.fireChannelRead(any())).thenAnswer((iom) -> {
+                outList.add(iom.getArgument(0));
+                return null;
+        });
 
         ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
         ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);
 
         ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
         try {
-            v3Decoder.decode(
-                mock(ChannelHandlerContext.class),
-                v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
-                outList
+            v3Decoder.channelRead(ctx,
+                v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT)
             );
             fail("V3 response decoder should fail on decoding v2 response");
         } catch (InvalidProtocolBufferException e) {
@@ -88,10 +93,9 @@ public class BookieProtoEncodingTest {
         }
         assertEquals(0, outList.size());
 
-        v3Decoder.decode(
-            mock(ChannelHandlerContext.class),
-            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
-            outList);
+        v3Decoder.channelRead(
+            ctx,
+            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
         assertEquals(1, outList.size());
     }
 

-- 
To stop receiving notification emails like this one, please contact
eolivelli@apache.org.