You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/09/08 12:36:16 UTC

svn commit: r1759798 - in /jackrabbit/oak/trunk/oak-segment-tar: pom.xml src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java

Author: frm
Date: Thu Sep  8 12:36:16 2016
New Revision: 1759798

URL: http://svn.apache.org/viewvc?rev=1759798&view=rev
Log:
OAK-4775 - Upgrade to Netty 4.0.41.Final

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/pom.xml
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/pom.xml?rev=1759798&r1=1759797&r2=1759798&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/pom.xml Thu Sep  8 12:36:16 2016
@@ -35,7 +35,7 @@
 
     <properties>
         <oak.version>1.5.5</oak.version>
-        <netty.version>4.0.23.Final</netty.version>
+        <netty.version>4.0.41.Final</netty.version>
     </properties>
 
     <scm>

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java?rev=1759798&r1=1759797&r2=1759798&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java Thu Sep  8 12:36:16 2016
@@ -91,15 +91,20 @@ public class SegmentLoaderHandler extend
 
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-        if (evt instanceof SegmentReply) {
-            onSegmentReply((SegmentReply) evt);
-        }
-
         if (evt instanceof String) {
             onCommand((String) evt);
         }
     }
 
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof SegmentReply) {
+            onSegmentReply((SegmentReply) msg);
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+    }
+
     private void onSegmentReply(SegmentReply reply) {
         // Offer the reply from the I/O thread, unblocking the sync thread.
         segment.offer(reply);

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java?rev=1759798&r1=1759797&r2=1759798&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java Thu Sep  8 12:36:16 2016
@@ -38,16 +38,18 @@ import org.slf4j.LoggerFactory;
 
 public class ReplyDecoder extends ReplayingDecoder<DecodingState> {
 
+    private static final int REPLY_HEADER_SIZE = 25;
+
     public enum DecodingState {
         HEADER, SEGMENT, BLOB
     }
 
-    private static final Logger log = LoggerFactory
-            .getLogger(ReplyDecoder.class);
+    private static final Logger log = LoggerFactory.getLogger(ReplyDecoder.class);
 
     private final StandbyStore store;
 
     private int length = -1;
+
     private byte type = -1;
 
     public ReplyDecoder(StandbyStore store) {
@@ -62,79 +64,96 @@ public class ReplyDecoder extends Replay
     }
 
     @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
-            List<Object> out) throws Exception {
-
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
         switch (state()) {
-        case HEADER: {
-            length = in.readInt();
-            type = in.readByte();
-            switch (type) {
-            case Messages.HEADER_SEGMENT:
-                checkpoint(DecodingState.SEGMENT);
-                break;
-            case Messages.HEADER_BLOB:
-                checkpoint(DecodingState.BLOB);
-                break;
-            default:
-                throw new Exception("Unknown type: " + type);
+            case HEADER: {
+                log.debug("Parsing header");
+                length = in.readInt();
+                type = in.readByte();
+                switch (type) {
+                    case Messages.HEADER_SEGMENT:
+                        checkpoint(DecodingState.SEGMENT);
+                        break;
+                    case Messages.HEADER_BLOB:
+                        checkpoint(DecodingState.BLOB);
+                        break;
+                    default:
+                        throw new Exception("Unknown type: " + type);
+                }
+                return;
             }
-            return;
-        }
 
-        case SEGMENT: {
-            Segment s = decodeSegment(in, length, type);
-            if (s != null) {
-                out.add(SegmentReply.empty());
-                ctx.fireUserEventTriggered(new SegmentReply(s));
-                reset();
+            case SEGMENT: {
+                log.debug("Parsing segment");
+                Segment s = decodeSegment(in, length, type);
+                if (s != null) {
+                    out.add(new SegmentReply(s));
+                    reset();
+                }
+                return;
             }
-            return;
-        }
 
-        case BLOB: {
-            IdArrayBasedBlob b = decodeBlob(in, length, type);
-            if (b != null) {
-                out.add(SegmentReply.empty());
-                ctx.fireUserEventTriggered(new SegmentReply(b));
-                reset();
+            case BLOB: {
+                log.debug("Parsing blob");
+                IdArrayBasedBlob b = decodeBlob(in, length, type);
+                if (b != null) {
+                    out.add(new SegmentReply(b));
+                    reset();
+                }
+                return;
             }
-            return;
-        }
 
-        default:
-            throw new Exception("Unknown decoding state: " + state());
+            default:
+                log.error("Message state unknown");
+                throw new Exception("Unknown decoding state: " + state());
         }
     }
 
     private Segment decodeSegment(ByteBuf in, int len, byte type) {
+        log.debug("Decoding segment, length={}, type={}", len - REPLY_HEADER_SIZE, type);
+
         long msb = in.readLong();
         long lsb = in.readLong();
         long hash = in.readLong();
 
+        if (log.isDebugEnabled()) {
+            log.debug("Decoding segment, id={}", new UUID(msb, lsb));
+        }
+
         // #readBytes throws a 'REPLAY' exception if there are not enough bytes
         // available for reading
-        ByteBuf data = in.readBytes(len - 25);
-        byte[] segment;
-        if (data.hasArray()) {
-            segment = data.array();
-        } else {
-            segment = new byte[len - 25];
-            in.readBytes(segment);
+        byte[] segment = readSegmentBytes(in.readBytes(len - REPLY_HEADER_SIZE));
+
+        if (log.isDebugEnabled()) {
+            log.debug("Verifying segment, id={}", new UUID(msb, lsb));
         }
 
         Hasher hasher = Hashing.murmur3_32().newHasher();
         long check = hasher.putBytes(segment).hash().padToLong();
+
         if (hash == check) {
             SegmentId id = store.newSegmentId(msb, lsb);
-            Segment s = store.newSegment(id, ByteBuffer.wrap(segment));
-            log.debug("received segment with id {} and size {}", id, s.size());
-            return s;
+            log.debug("Segment verified, id={}", id);
+            return store.newSegment(id, ByteBuffer.wrap(segment));
         }
-        log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+
+        if (log.isDebugEnabled()) {
+            log.debug("Segment corrupted, id={}", new UUID(msb, lsb));
+        }
+
         return null;
     }
 
+    private byte[] readSegmentBytes(ByteBuf data) {
+        if (data.hasArray()) {
+            return data.array();
+        }
+
+        byte[] result = new byte[data.readableBytes()];
+        data.readBytes(result);
+        return result;
+    }
+
     private IdArrayBasedBlob decodeBlob(ByteBuf in, int length, byte type) {
         int inIdLen = in.readInt();
         byte[] bid = new byte[inIdLen];