You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/07/01 12:17:23 UTC
svn commit: r1607031 - in /jackrabbit/oak/trunk/oak-tarmk-failover: ./
src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/
src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/
Author: alexparvulescu
Date: Tue Jul 1 10:17:22 2014
New Revision: 1607031
URL: http://svn.apache.org/r1607031
Log:
OAK-1915 TarMK failover 2.0
- added checksum verification to the segment transfer
Modified:
jackrabbit/oak/trunk/oak-tarmk-failover/README.md
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/README.md?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/README.md (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/README.md Tue Jul 1 10:17:22 2014
@@ -7,8 +7,8 @@ Failover
The component should be installed when failover support is needed.
The setup is expected to be: one master to one/many slaves nodes.
-The slave will periodically poll the master for the head state over http
-on a custom port, if it changed, it should pull in all the new segments.
+The slave will periodically poll the master for the head state, if this
+changed, it will pull in all the new segments since the last sync.
Setup in OSGi
-------------
@@ -24,7 +24,7 @@ Master host represents the master host i
Interval represents how often the sync thread should run, in seconds.
See examples in the osgi-conf folder for each run mode. To install a new OSGI config in the sling launcher,
-you only need to create a new folder called 'install' in the sling.home folder and copy the configs there.
+you only need to create a new folder called 'install' in the sling.home folder and copy the specific config there.
TODO
----
@@ -32,7 +32,6 @@ TODO
- timeout handling doesn't cover everything on both server and slave
- error handling on the slave still has some issues (the slave hangs)
- maybe enable compression of the segments over the wire
- - maybe add a checksum to the segment encoder/decoder to verify the integrity of the transfer
- slave runmode could possibly be a read-only mode (no writes permitted)
License
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/client/FailoverClient.java Tue Jul 1 10:17:22 2014
@@ -80,12 +80,6 @@ public final class FailoverClient implem
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
-
- // p.addLast(new LoggingHandler(LogLevel.INFO));
- // Enable stream compression
- // p.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
- // p.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
-
// WriteTimeoutHandler & ReadTimeoutHandler
p.addLast("readTimeoutHandler", new ReadTimeoutHandler(
readTimeoutMs, TimeUnit.MILLISECONDS));
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentDecoder.java Tue Jul 1 10:17:22 2014
@@ -23,12 +23,18 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory
@@ -37,7 +43,7 @@ public class SegmentDecoder extends Leng
private final SegmentStore store;
public SegmentDecoder(SegmentStore store) {
- super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 4);
+ super(Segment.MAX_SEGMENT_SIZE + 21, 0, 4, 0, 0);
this.store = store;
}
@@ -48,14 +54,26 @@ public class SegmentDecoder extends Leng
if (frame == null) {
return null;
}
+ int len = frame.readInt();
byte type = frame.readByte();
long msb = frame.readLong();
long lsb = frame.readLong();
- frame.discardReadBytes();
- SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
- Segment s = new Segment(store.getTracker(), id, frame.nioBuffer());
- log.debug("received type {} with id {} and size {}", type, id, s.size());
- return s;
+ long hash = frame.readLong();
+ byte[] segment = new byte[len - 25];
+ frame.getBytes(29, segment);
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long check = hasher.putBytes(segment).hash().padToLong();
+ if (hash == check) {
+ SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
+ Segment s = new Segment(store.getTracker(), id,
+ ByteBuffer.wrap(segment));
+ log.debug("received type {} with id {} and size {}", type, id,
+ s.size());
+ return s;
+ }
+ log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+ return null;
+
}
@Override
Modified: jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java?rev=1607031&r1=1607030&r2=1607031&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-failover/src/main/java/org/apache/jackrabbit/oak/plugins/segment/failover/codec/SegmentEncoder.java Tue Jul 1 10:17:22 2014
@@ -20,27 +20,36 @@
package org.apache.jackrabbit.oak.plugins.segment.failover.codec;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.ByteArrayOutputStream;
+
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
public class SegmentEncoder extends MessageToByteEncoder<Segment> {
@Override
protected void encode(ChannelHandlerContext ctx, Segment s, ByteBuf out)
throws Exception {
SegmentId id = s.getSegmentId();
- int len = s.size() + 17;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size());
+ s.writeTo(baos);
+ byte[] segment = baos.toByteArray();
+
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long hash = hasher.putBytes(segment).hash().padToLong();
+
+ int len = segment.length + 25;
out.writeInt(len);
out.writeByte(Messages.HEADER_SEGMENT);
out.writeLong(id.getMostSignificantBits());
out.writeLong(id.getLeastSignificantBits());
- ByteBufOutputStream bout = new ByteBufOutputStream(out);
- s.writeTo(bout);
- bout.flush();
- bout.close();
+ out.writeLong(hash);
+ out.writeBytes(segment);
}
}