You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/26 13:54:41 UTC

[GitHub] eolivelli closed pull request #1293: Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder

eolivelli closed pull request #1293: Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder
URL: https://github.com/apache/bookkeeper/pull/1293
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 15920ddff..7a18a052b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,21 +20,10 @@
  */
 package org.apache.bookkeeper.proto;
 
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-
-import org.apache.bookkeeper.client.MacDigestManager;
-import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
-import org.apache.bookkeeper.util.DoubleByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.ExtensionRegistry;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufInputStream;
@@ -42,9 +31,21 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import org.apache.bookkeeper.client.MacDigestManager;
+import org.apache.bookkeeper.util.DoubleByteBuf;
+import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * A class for encoding and decoding the Bookkeeper protocol.
+ */
 public class BookieProtoEncoding {
     private static final Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
 
@@ -355,122 +356,127 @@ private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator alloc
     }
 
     @Sharable
-    public static class RequestEncoder extends MessageToMessageEncoder<Object> {
+    public static class RequestEncoder extends ChannelOutboundHandlerAdapter {
 
-        final EnDecoder REQ_PREV3;
-        final EnDecoder REQ_V3;
+        final EnDecoder reqPreV3;
+        final EnDecoder reqV3;
 
         public RequestEncoder(ExtensionRegistry extensionRegistry) {
-            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
-            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+            reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            reqV3 = new RequestEnDecoderV3(extensionRegistry);
         }
 
         @Override
-        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Encode request {} to channel {}.", msg, ctx.channel());
+            }
             if (msg instanceof BookkeeperProtocol.Request) {
-                out.add(REQ_V3.encode(msg, ctx.alloc()));
+                ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Request) {
-                out.add(REQ_PREV3.encode(msg, ctx.alloc()));
+                ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
             } else {
                 LOG.error("Invalid request to encode to {}: {}", ctx.channel(), msg.getClass().getName());
-                out.add(msg);
+                ctx.write(msg, promise);
             }
         }
     }
 
     @Sharable
-    public static class RequestDecoder extends MessageToMessageDecoder<Object> {
-        final EnDecoder REQ_PREV3;
-        final EnDecoder REQ_V3;
+    public static class RequestDecoder extends ChannelInboundHandlerAdapter {
+        final EnDecoder reqPreV3;
+        final EnDecoder reqV3;
         boolean usingV3Protocol;
 
         RequestDecoder(ExtensionRegistry extensionRegistry) {
-            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
-            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+            reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            reqV3 = new RequestEnDecoderV3(extensionRegistry);
             usingV3Protocol = true;
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Received request {} from channel {} to decode.", msg, ctx.channel());
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Received request {} from channel {} to decode.", msg, ctx.channel());
             }
-            if (!(msg instanceof ByteBuf)) {
-                out.add(msg);
-                return;
-            }
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.markReaderIndex();
-
-            if (usingV3Protocol) {
-                try {
-                    out.add(REQ_V3.decode(buffer));
-                } catch (InvalidProtocolBufferException e) {
-                    usingV3Protocol = false;
-                    buffer.resetReaderIndex();
-                    out.add(REQ_PREV3.decode(buffer));
+            try {
+                if (!(msg instanceof ByteBuf)) {
+                    LOG.error("Received invalid request {} from channel {} to decode.", msg, ctx.channel());
+                    ctx.fireChannelRead(msg);
+                    return;
                 }
-            } else {
-                out.add(REQ_PREV3.decode(buffer));
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.markReaderIndex();
+                Object result;
+                if (usingV3Protocol) {
+                    try {
+                        result = reqV3.decode(buffer);
+                    } catch (InvalidProtocolBufferException e) {
+                        usingV3Protocol = false;
+                        buffer.resetReaderIndex();
+                        result = reqPreV3.decode(buffer);
+                    }
+                } else {
+                    result = reqPreV3.decode(buffer);
+                }
+                ctx.fireChannelRead(result);
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
     }
 
     @Sharable
-    public static class ResponseEncoder extends MessageToMessageEncoder<Object> {
-        final EnDecoder REP_PREV3;
-        final EnDecoder REP_V3;
+    public static class ResponseEncoder extends ChannelOutboundHandlerAdapter {
+        final EnDecoder repPreV3;
+        final EnDecoder repV3;
 
         ResponseEncoder(ExtensionRegistry extensionRegistry) {
-            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
-            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
+            repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
+            repV3 = new ResponseEnDecoderV3(extensionRegistry);
         }
 
         @Override
-        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out)
-                throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Encode response {} to channel {}.", msg, ctx.channel());
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
             }
             if (msg instanceof BookkeeperProtocol.Response) {
-                out.add(REP_V3.encode(msg, ctx.alloc()));
+                ctx.write(repV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Response) {
-                out.add(REP_PREV3.encode(msg, ctx.alloc()));
+                ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
             } else {
                 LOG.error("Invalid response to encode to {}: {}", ctx.channel(), msg.getClass().getName());
-                out.add(msg);
+                ctx.write(msg, promise);
             }
         }
     }
 
     @Sharable
-    public static class ResponseDecoder extends MessageToMessageDecoder<Object> {
-        final EnDecoder REP_PREV3;
-        final EnDecoder REP_V3;
-        boolean usingV2Protocol;
+    public static class ResponseDecoder extends ChannelInboundHandlerAdapter {
+        final EnDecoder rep;
 
         ResponseDecoder(ExtensionRegistry extensionRegistry, boolean useV2Protocol) {
-            REP_PREV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
-            REP_V3 = new ResponseEnDecoderV3(extensionRegistry);
-            usingV2Protocol = useV2Protocol;
+            rep = useV2Protocol
+                    ? new ResponseEnDeCoderPreV3(extensionRegistry) : new ResponseEnDecoderV3(extensionRegistry);
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Received response {} from channel {} to decode.", msg, ctx.channel());
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Received response {} from channel {} to decode.", msg, ctx.channel());
             }
-            if (!(msg instanceof ByteBuf)) {
-                out.add(msg);
-            }
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.markReaderIndex();
-
-            if (!usingV2Protocol) {
-                out.add(REP_V3.decode(buffer));
-            } else {
-                // If in the same connection we already got preV3 messages, don't try again to decode V3 messages
-                out.add(REP_PREV3.decode(buffer));
+            try {
+                if (!(msg instanceof ByteBuf)) {
+                    LOG.error("Received invalid response {} from channel {} to decode.", msg, ctx.channel());
+                    ctx.fireChannelRead(msg);
+                    return;
+                }
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.markReaderIndex();
+                ctx.fireChannelRead(rep.decode(buffer));
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index f1823719b..6f9cc9aeb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -21,7 +21,9 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
@@ -71,16 +73,19 @@ public void testV3ResponseDecoderNoFallback() throws Exception {
             .build();
 
         List<Object> outList = Lists.newArrayList();
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.fireChannelRead(any())).thenAnswer((iom) -> {
+                outList.add(iom.getArgument(0));
+                return null;
+        });
 
         ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
         ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);
 
         ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
         try {
-            v3Decoder.decode(
-                mock(ChannelHandlerContext.class),
-                v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
-                outList
+            v3Decoder.channelRead(ctx,
+                v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT)
             );
             fail("V3 response decoder should fail on decoding v2 response");
         } catch (InvalidProtocolBufferException e) {
@@ -88,10 +93,9 @@ public void testV3ResponseDecoderNoFallback() throws Exception {
         }
         assertEquals(0, outList.size());
 
-        v3Decoder.decode(
-            mock(ChannelHandlerContext.class),
-            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
-            outList);
+        v3Decoder.channelRead(
+            ctx,
+            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
         assertEquals(1, outList.size());
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services