You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/09/08 12:36:16 UTC
svn commit: r1759798 - in /jackrabbit/oak/trunk/oak-segment-tar: pom.xml
src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java
src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java
Author: frm
Date: Thu Sep 8 12:36:16 2016
New Revision: 1759798
URL: http://svn.apache.org/viewvc?rev=1759798&view=rev
Log:
OAK-4775 - Upgrade to Netty 4.0.41.Final
Modified:
jackrabbit/oak/trunk/oak-segment-tar/pom.xml
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/pom.xml?rev=1759798&r1=1759797&r2=1759798&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/pom.xml Thu Sep 8 12:36:16 2016
@@ -35,7 +35,7 @@
<properties>
<oak.version>1.5.5</oak.version>
- <netty.version>4.0.23.Final</netty.version>
+ <netty.version>4.0.41.Final</netty.version>
</properties>
<scm>
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java?rev=1759798&r1=1759797&r2=1759798&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java Thu Sep 8 12:36:16 2016
@@ -91,15 +91,20 @@ public class SegmentLoaderHandler extend
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof SegmentReply) {
- onSegmentReply((SegmentReply) evt);
- }
-
if (evt instanceof String) {
onCommand((String) evt);
}
}
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof SegmentReply) {
+ onSegmentReply((SegmentReply) msg);
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
private void onSegmentReply(SegmentReply reply) {
// Offer the reply from the I/O thread, unblocking the sync thread.
segment.offer(reply);
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java?rev=1759798&r1=1759797&r2=1759798&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java Thu Sep 8 12:36:16 2016
@@ -38,16 +38,18 @@ import org.slf4j.LoggerFactory;
public class ReplyDecoder extends ReplayingDecoder<DecodingState> {
+ private static final int REPLY_HEADER_SIZE = 25;
+
public enum DecodingState {
HEADER, SEGMENT, BLOB
}
- private static final Logger log = LoggerFactory
- .getLogger(ReplyDecoder.class);
+ private static final Logger log = LoggerFactory.getLogger(ReplyDecoder.class);
private final StandbyStore store;
private int length = -1;
+
private byte type = -1;
public ReplyDecoder(StandbyStore store) {
@@ -62,79 +64,96 @@ public class ReplyDecoder extends Replay
}
@Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in,
- List<Object> out) throws Exception {
-
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
- case HEADER: {
- length = in.readInt();
- type = in.readByte();
- switch (type) {
- case Messages.HEADER_SEGMENT:
- checkpoint(DecodingState.SEGMENT);
- break;
- case Messages.HEADER_BLOB:
- checkpoint(DecodingState.BLOB);
- break;
- default:
- throw new Exception("Unknown type: " + type);
+ case HEADER: {
+ log.debug("Parsing header");
+ length = in.readInt();
+ type = in.readByte();
+ switch (type) {
+ case Messages.HEADER_SEGMENT:
+ checkpoint(DecodingState.SEGMENT);
+ break;
+ case Messages.HEADER_BLOB:
+ checkpoint(DecodingState.BLOB);
+ break;
+ default:
+ throw new Exception("Unknown type: " + type);
+ }
+ return;
}
- return;
- }
- case SEGMENT: {
- Segment s = decodeSegment(in, length, type);
- if (s != null) {
- out.add(SegmentReply.empty());
- ctx.fireUserEventTriggered(new SegmentReply(s));
- reset();
+ case SEGMENT: {
+ log.debug("Parsing segment");
+ Segment s = decodeSegment(in, length, type);
+ if (s != null) {
+ out.add(new SegmentReply(s));
+ reset();
+ }
+ return;
}
- return;
- }
- case BLOB: {
- IdArrayBasedBlob b = decodeBlob(in, length, type);
- if (b != null) {
- out.add(SegmentReply.empty());
- ctx.fireUserEventTriggered(new SegmentReply(b));
- reset();
+ case BLOB: {
+ log.debug("Parsing blob");
+ IdArrayBasedBlob b = decodeBlob(in, length, type);
+ if (b != null) {
+ out.add(new SegmentReply(b));
+ reset();
+ }
+ return;
}
- return;
- }
- default:
- throw new Exception("Unknown decoding state: " + state());
+ default:
+ log.error("Message state unknown");
+ throw new Exception("Unknown decoding state: " + state());
}
}
private Segment decodeSegment(ByteBuf in, int len, byte type) {
+ log.debug("Decoding segment, length={}, type={}", len - REPLY_HEADER_SIZE, type);
+
long msb = in.readLong();
long lsb = in.readLong();
long hash = in.readLong();
+ if (log.isDebugEnabled()) {
+ log.debug("Decoding segment, id={}", new UUID(msb, lsb));
+ }
+
// #readBytes throws a 'REPLAY' exception if there are not enough bytes
// available for reading
- ByteBuf data = in.readBytes(len - 25);
- byte[] segment;
- if (data.hasArray()) {
- segment = data.array();
- } else {
- segment = new byte[len - 25];
- in.readBytes(segment);
+ byte[] segment = readSegmentBytes(in.readBytes(len - REPLY_HEADER_SIZE));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Verifying segment, id={}", new UUID(msb, lsb));
}
Hasher hasher = Hashing.murmur3_32().newHasher();
long check = hasher.putBytes(segment).hash().padToLong();
+
if (hash == check) {
SegmentId id = store.newSegmentId(msb, lsb);
- Segment s = store.newSegment(id, ByteBuffer.wrap(segment));
- log.debug("received segment with id {} and size {}", id, s.size());
- return s;
+ log.debug("Segment verified, id={}", id);
+ return store.newSegment(id, ByteBuffer.wrap(segment));
}
- log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Segment corrupted, id={}", new UUID(msb, lsb));
+ }
+
return null;
}
+ private byte[] readSegmentBytes(ByteBuf data) {
+ if (data.hasArray()) {
+ return data.array();
+ }
+
+ byte[] result = new byte[data.readableBytes()];
+ data.readBytes(result);
+ return result;
+ }
+
private IdArrayBasedBlob decodeBlob(ByteBuf in, int length, byte type) {
int inIdLen = in.readInt();
byte[] bid = new byte[inIdLen];