You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/07/01 12:17:23 UTC

svn commit: r1607031 - in /jackrabbit/oak/trunk/oak-tarmk-failover: ./ src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/ src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/

Author: alexparvulescu
Date: Tue Jul  1 10:17:22 2014
New Revision: 1607031

URL: http://svn.apache.org/r1607031
Log:
OAK-1915 TarMK failover 2.0
 - added checksum verification to the segment transfer

Modified:
    jackrabbit/oak/trunk/oak-tarmk-failover/README.md
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
    jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/README.md?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/README.md (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/README.md Tue Jul  1 10:17:22 2014
@@ -7,8 +7,8 @@ Failover
 The component should be installed when failover support is needed.
 
 The setup is expected to be: one master to one/many slaves nodes.
-The slave will periodically poll the master for the head state over http
-on a custom port, if it changed, it should pull in all the new segments.
+The slave will periodically poll the master for the head state, if this
+changed, it will pull in all the new segments since the last sync.
 
 Setup in OSGi
 -------------
@@ -24,7 +24,7 @@ Master host represents the master host i
 Interval represents how often the sync thread should run, in seconds.
 
 See examples in the osgi-conf folder for each run mode. To install a new OSGI config in the sling launcher,
-you only need to create a new folder called 'install' in the sling.home folder and copy the configs there.
+you only need to create a new folder called 'install' in the sling.home folder and copy the specific config there.
 
 TODO
 ----
@@ -32,7 +32,6 @@ TODO
   - timeout handling doesn't cover everything on both server and slave
   - error handling on the slave still has some issues (the slave hangs)
   - maybe enable compression of the segments over the wire
-  - maybe add a checksum to the segment encoder/decoder to verify the integrity of the transfer
   - slave runmode could possibly be a read-only mode (no writes permitted)
 
 License

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Tue Jul  1 10:17:22 2014
@@ -80,12 +80,6 @@ public final class FailoverClient implem
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
-
-                // p.addLast(new LoggingHandler(LogLevel.INFO));
-                // Enable stream compression
-                // p.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
-                // p.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
-
                 // WriteTimeoutHandler & ReadTimeoutHandler
                 p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
                         readTimeoutMs, TimeUnit.MILLISECONDS));

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java Tue Jul  1 10:17:22 2014
@@ -23,12 +23,18 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
 public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
 
     private static final Logger log = LoggerFactory
@@ -37,7 +43,7 @@ public class SegmentDecoder extends Leng
     private final SegmentStore store;
 
     public SegmentDecoder(SegmentStore store) {
-        super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 4);
+        super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 0);
         this.store = store;
     }
 
@@ -48,14 +54,26 @@ public class SegmentDecoder extends Leng
         if (frame == null) {
             return null;
         }
+        int len = frame.readInt();
         byte type = frame.readByte();
         long msb = frame.readLong();
         long lsb = frame.readLong();
-        frame.discardReadBytes();
-        SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
-        Segment s = new Segment(store.getTracker(), id, frame.nioBuffer());
-        log.debug("received type {} with id {} and size {}", type, id, s.size());
-        return s;
+        long hash = frame.readLong();
+        byte[] segment = new byte[len - 25];
+        frame.getBytes(29, segment);
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long check = hasher.putBytes(segment).hash().padToLong();
+        if (hash == check) {
+            SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
+            Segment s = new Segment(store.getTracker(), id,
+                    ByteBuffer.wrap(segment));
+            log.debug("received type {} with id {} and size {}", type, id,
+                    s.size());
+            return s;
+        }
+        log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+        return null;
+
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java Tue Jul  1 10:17:22 2014
@@ -20,27 +20,36 @@
 package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 
+import java.io.ByteArrayOutputStream;
+
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
 
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
 public class SegmentEncoder extends MessageToByteEncoder<Segment> {
 
     @Override
     protected void encode(ChannelHandlerContext ctx, Segment s, ByteBuf out)
             throws Exception {
         SegmentId id = s.getSegmentId();
-        int len = s.size() + 17;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size());
+        s.writeTo(baos);
+        byte[] segment = baos.toByteArray();
+
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long hash = hasher.putBytes(segment).hash().padToLong();
+
+        int len = segment.length + 25;
         out.writeInt(len);
         out.writeByte(Messages.HEADER_SEGMENT);
         out.writeLong(id.getMostSignificantBits());
         out.writeLong(id.getLeastSignificantBits());
-        ByteBufOutputStream bout = new ByteBufOutputStream(out);
-        s.writeTo(bout);
-        bout.flush();
-        bout.close();
+        out.writeLong(hash);
+        out.writeBytes(segment);
     }
 }