You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/26 11:25:26 UTC

[GitHub] ivankelly commented on a change in pull request #1293: Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder

ivankelly commented on a change in pull request #1293: Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder
URL: https://github.com/apache/bookkeeper/pull/1293#discussion_r177057026
 
 

 ##########
 File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 ##########
 @@ -355,122 +356,127 @@ private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator alloc
     }
 
     @Sharable
-    public static class RequestEncoder extends MessageToMessageEncoder<Object> {
+    public static class RequestEncoder extends ChannelOutboundHandlerAdapter {
 
-        final EnDecoder REQ_PREV3;
-        final EnDecoder REQ_V3;
+        final EnDecoder reqPreV3;
+        final EnDecoder reqV3;
 
         public RequestEncoder(ExtensionRegistry extensionRegistry) {
-            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
-            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+            reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            reqV3 = new RequestEnDecoderV3(extensionRegistry);
         }
-
+        
         @Override
-        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Encode request {} to channel {}.", msg, ctx.channel());
+            }
             if (msg instanceof BookkeeperProtocol.Request) {
-                out.add(REQ_V3.encode(msg, ctx.alloc()));
+                ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Request) {
-                out.add(REQ_PREV3.encode(msg, ctx.alloc()));
+                ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
             } else {
                 LOG.error("Invalid request to encode to {}: {}", ctx.channel(), msg.getClass().getName());
-                out.add(msg);
+                ctx.write(msg, promise);
             }
         }
     }
 
     @Sharable
-    public static class RequestDecoder extends MessageToMessageDecoder<Object> {
-        final EnDecoder REQ_PREV3;
-        final EnDecoder REQ_V3;
+    public static class RequestDecoder extends ChannelInboundHandlerAdapter {
+        final EnDecoder reqPreV3;
+        final EnDecoder reqV3;
         boolean usingV3Protocol;
 
         RequestDecoder(ExtensionRegistry extensionRegistry) {
-            REQ_PREV3 = new RequestEnDeCoderPreV3(extensionRegistry);
-            REQ_V3 = new RequestEnDecoderV3(extensionRegistry);
+            reqPreV3 = new RequestEnDeCoderPreV3(extensionRegistry);
+            reqV3 = new RequestEnDecoderV3(extensionRegistry);
             usingV3Protocol = true;
         }
 
         @Override
-        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Received request {} from channel {} to decode.", msg, ctx.channel());
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Received request {} from channel {} to decode.", msg, ctx.channel());
             }
-            if (!(msg instanceof ByteBuf)) {
-                out.add(msg);
-                return;
-            }
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.markReaderIndex();
-
-            if (usingV3Protocol) {
-                try {
-                    out.add(REQ_V3.decode(buffer));
-                } catch (InvalidProtocolBufferException e) {
-                    usingV3Protocol = false;
-                    buffer.resetReaderIndex();
-                    out.add(REQ_PREV3.decode(buffer));
+            try {
+                if (!(msg instanceof ByteBuf)) {
+                    LOG.error("Received invalid request {} from channel {} to decode.", msg, ctx.channel());
+                    ctx.fireChannelRead(msg);
+                    return;
                 }
-            } else {
-                out.add(REQ_PREV3.decode(buffer));
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.markReaderIndex();
+                Object result;
+                if (usingV3Protocol) {
+                    try {
+                        result = reqV3.decode(buffer);
+                    } catch (InvalidProtocolBufferException e) {
+                        usingV3Protocol = false;
+                        buffer.resetReaderIndex();
+                        result = reqPreV3.decode(buffer);
+                    }
+                } else {
+                    result = reqPreV3.decode(buffer);
+                }
+                ctx.fireChannelRead(result);
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
     }
 
-    @Sharable
-    public static class ResponseEncoder extends MessageToMessageEncoder<Object> {
-        final EnDecoder REP_PREV3;
-        final EnDecoder REP_V3;
+    @Sharable    
 
 Review comment:
   why isn't checkstyle complaining about trailing space?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services