You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/03/26 09:40:36 UTC
[bookkeeper] branch master updated: Implement directly
ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a36f1ab Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder
a36f1ab is described below
commit a36f1ab88034d6b1bdfe75e725886c2e8cca1e85
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Mon Mar 26 11:40:26 2018 +0200
Implement directly ChannelOutboundHandlerAdapter in BookieProtoEncoding#ResponseEncoder
This change is mostly a clean up/refactor which drops intermediate MessageToMessageEncoder and MessageToMessageDecoder from BookieProtoEncoding
Author: Enrico Olivelli <eo...@apache.org>
Author: Enrico Olivelli <eo...@gmail.com>
Reviewers: Ivan Kelly <iv...@apache.org>, Jia Zhai <None>
This closes #1286 from eolivelli/fix/java9-response-corrupted
---
.../bookkeeper/proto/BookieProtoEncoding.java | 105 +++++++++++----------
.../bookkeeper/proto/BookieProtoEncodingTest.java | 20 ++--
2 files changed, 66 insertions(+), 59 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index a45f94e..151a799 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -32,12 +32,13 @@ import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
-import java.util.List;
import org.apache.bookkeeper.proto.BookieProtocol.PacketHeader;
import org.apache.bookkeeper.proto.checksum.MacDigestManager;
@@ -371,7 +372,7 @@ public class BookieProtoEncoding {
* A request message encoder.
*/
@Sharable
- public static class RequestEncoder extends MessageToMessageEncoder<Object> {
+ public static class RequestEncoder extends ChannelOutboundHandlerAdapter {
final EnDecoder reqPreV3;
final EnDecoder reqV3;
@@ -382,17 +383,17 @@ public class BookieProtoEncoding {
}
@Override
- protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Encode request {} to channel {}.", msg, ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Request) {
- out.add(reqV3.encode(msg, ctx.alloc()));
+ ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Request) {
- out.add(reqPreV3.encode(msg, ctx.alloc()));
+ ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
} else {
LOG.error("Invalid request to encode to {}: {}", ctx.channel(), msg.getClass().getName());
- out.add(msg);
+ ctx.write(msg, promise);
}
}
}
@@ -401,7 +402,7 @@ public class BookieProtoEncoding {
* A request message decoder.
*/
@Sharable
- public static class RequestDecoder extends MessageToMessageDecoder<Object> {
+ public static class RequestDecoder extends ChannelInboundHandlerAdapter {
final EnDecoder reqPreV3;
final EnDecoder reqV3;
boolean usingV3Protocol;
@@ -413,27 +414,33 @@ public class BookieProtoEncoding {
}
@Override
- protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Received request {} from channel {} to decode.", msg, ctx.channel());
}
- if (!(msg instanceof ByteBuf)) {
- out.add(msg);
- return;
- }
- ByteBuf buffer = (ByteBuf) msg;
- buffer.markReaderIndex();
-
- if (usingV3Protocol) {
- try {
- out.add(reqV3.decode(buffer));
- } catch (InvalidProtocolBufferException e) {
- usingV3Protocol = false;
- buffer.resetReaderIndex();
- out.add(reqPreV3.decode(buffer));
+ try {
+ if (!(msg instanceof ByteBuf)) {
+ LOG.error("Received invalid request {} from channel {} to decode.", msg, ctx.channel());
+ ctx.fireChannelRead(msg);
+ return;
}
- } else {
- out.add(reqPreV3.decode(buffer));
+ ByteBuf buffer = (ByteBuf) msg;
+ buffer.markReaderIndex();
+ Object result;
+ if (usingV3Protocol) {
+ try {
+ result = reqV3.decode(buffer);
+ } catch (InvalidProtocolBufferException e) {
+ usingV3Protocol = false;
+ buffer.resetReaderIndex();
+ result = reqPreV3.decode(buffer);
+ }
+ } else {
+ result = reqPreV3.decode(buffer);
+ }
+ ctx.fireChannelRead(result);
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
}
@@ -442,7 +449,7 @@ public class BookieProtoEncoding {
* A response message encoder.
*/
@Sharable
- public static class ResponseEncoder extends MessageToMessageEncoder<Object> {
+ public static class ResponseEncoder extends ChannelOutboundHandlerAdapter {
final EnDecoder repPreV3;
final EnDecoder repV3;
@@ -452,18 +459,17 @@ public class BookieProtoEncoding {
}
@Override
- protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out)
- throws Exception {
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Response) {
- out.add(repV3.encode(msg, ctx.alloc()));
+ ctx.write(repV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Response) {
- out.add(repPreV3.encode(msg, ctx.alloc()));
+ ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);
} else {
LOG.error("Invalid response to encode to {}: {}", ctx.channel(), msg.getClass().getName());
- out.add(msg);
+ ctx.write(msg, promise);
}
}
}
@@ -472,33 +478,30 @@ public class BookieProtoEncoding {
* A response message decoder.
*/
@Sharable
- public static class ResponseDecoder extends MessageToMessageDecoder<Object> {
- final EnDecoder repPreV3;
- final EnDecoder repV3;
- boolean usingV2Protocol;
+ public static class ResponseDecoder extends ChannelInboundHandlerAdapter {
+ final EnDecoder rep;
ResponseDecoder(ExtensionRegistry extensionRegistry, boolean useV2Protocol) {
- repPreV3 = new ResponseEnDeCoderPreV3(extensionRegistry);
- repV3 = new ResponseEnDecoderV3(extensionRegistry);
- usingV2Protocol = useV2Protocol;
+ rep = useV2Protocol
+ ? new ResponseEnDeCoderPreV3(extensionRegistry) : new ResponseEnDecoderV3(extensionRegistry);
}
@Override
- protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Received response {} from channel {} to decode.", msg, ctx.channel());
}
- if (!(msg instanceof ByteBuf)) {
- out.add(msg);
- }
- ByteBuf buffer = (ByteBuf) msg;
- buffer.markReaderIndex();
-
- if (!usingV2Protocol) {
- out.add(repV3.decode(buffer));
- } else {
- // If in the same connection we already got preV3 messages, don't try again to decode V3 messages
- out.add(repPreV3.decode(buffer));
+ try {
+ if (!(msg instanceof ByteBuf)) {
+ LOG.error("Received invalid response {} from channel {} to decode.", msg, ctx.channel());
+ ctx.fireChannelRead(msg);
+ return;
+ }
+ ByteBuf buffer = (ByteBuf) msg;
+ buffer.markReaderIndex();
+ ctx.fireChannelRead(rep.decode(buffer));
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index f182371..6f9cc9a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -21,7 +21,9 @@ package org.apache.bookkeeper.proto;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
@@ -71,16 +73,19 @@ public class BookieProtoEncodingTest {
.build();
List<Object> outList = Lists.newArrayList();
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.fireChannelRead(any())).thenAnswer((iom) -> {
+ outList.add(iom.getArgument(0));
+ return null;
+ });
ResponseEnDeCoderPreV3 v2Encoder = new ResponseEnDeCoderPreV3(null);
ResponseEnDecoderV3 v3Encoder = new ResponseEnDecoderV3(null);
ResponseDecoder v3Decoder = new ResponseDecoder(null, false);
try {
- v3Decoder.decode(
- mock(ChannelHandlerContext.class),
- v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT),
- outList
+ v3Decoder.channelRead(ctx,
+ v2Encoder.encode(v2Resp, UnpooledByteBufAllocator.DEFAULT)
);
fail("V3 response decoder should fail on decoding v2 response");
} catch (InvalidProtocolBufferException e) {
@@ -88,10 +93,9 @@ public class BookieProtoEncodingTest {
}
assertEquals(0, outList.size());
- v3Decoder.decode(
- mock(ChannelHandlerContext.class),
- v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT),
- outList);
+ v3Decoder.channelRead(
+ ctx,
+ v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
assertEquals(1, outList.size());
}
--
To stop receiving notification emails like this one, please contact
eolivelli@apache.org.