You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2014/12/11 17:06:20 UTC

svn commit: r1644689 [1/2] - in /jackrabbit/oak/trunk/oak-tarmk-standby: ./ src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/ src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ src/main/java/org/apache/jackrabb...

Author: alexparvulescu
Date: Thu Dec 11 16:06:19 2014
New Revision: 1644689

URL: http://svn.apache.org/r1644689
Log:
OAK-2347 TarMK Cold Standby FSDS mirroring

Added:
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java   (with props)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java   (with props)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java   (with props)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java   (with props)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java   (with props)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentPreLoaderHandler.java
Modified:
    jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java   (contents, props changed)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java   (contents, props changed)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java   (contents, props changed)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java
    jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/FailoverMultipleClientsTest.java   (contents, props changed)
    jackrabbit/oak/trunk/oak-tarmk-standby/src/test/resources/logback-test.xml

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/pom.xml Thu Dec 11 16:06:19 2014
@@ -141,6 +141,12 @@
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>oak-blob</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <dependency>
       <groupId>io.netty</groupId>
@@ -202,5 +208,11 @@
       <artifactId>logback-classic</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>jackrabbit-data</artifactId>
+      <version>2.9.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java Thu Dec 11 16:06:19 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.standby.client;
 
+import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetBlobReq;
 import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -28,16 +29,17 @@ import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentReply;
-import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
 import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
-import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
+import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,24 +52,23 @@ public class SegmentLoaderHandler extend
     private final StandbyStore store;
     private final String clientID;
     private final RecordId head;
-    private final EventExecutorGroup preloaderExecutor;
     private final EventExecutorGroup loaderExecutor;
+    private final AtomicBoolean running;
 
-    private int timeoutMs = 5000;
+    private int timeoutMs = 120000;
 
     private ChannelHandlerContext ctx;
 
-    final BlockingQueue<Segment> segment = new LinkedBlockingQueue<Segment>();
+    final BlockingQueue<SegmentReply> segment = new LinkedBlockingQueue<SegmentReply>();
 
     public SegmentLoaderHandler(final StandbyStore store, RecordId head,
-            EventExecutorGroup preloaderExecutor,
             EventExecutorGroup loaderExecutor,
-            String clientID) {
+            String clientID, AtomicBoolean running) {
         this.store = store;
         this.head = head;
-        this.preloaderExecutor = preloaderExecutor;
         this.loaderExecutor = loaderExecutor;
         this.clientID = clientID;
+        this.running = running;
     }
 
     @Override
@@ -80,8 +81,7 @@ public class SegmentLoaderHandler extend
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
             throws Exception {
         if (evt instanceof SegmentReply) {
-            //log.debug("offering segment " + ((SegmentReply) evt).getSegment());
-            segment.offer(((SegmentReply) evt).getSegment());
+            segment.offer((SegmentReply) evt);
         }
     }
 
@@ -97,10 +97,10 @@ public class SegmentLoaderHandler extend
             SegmentNodeState current = new SegmentNodeState(head);
             do {
                 try {
-                    current.compareAgainstBaseState(before, new ApplyDiff(builder));
+                    current.compareAgainstBaseState(before,
+                            new StandbyApplyDiff(builder, store, this));
                     break;
-                }
-                catch (SegmentNotFoundException e) {
+                } catch (SegmentNotFoundException e) {
                     // the segment is locally damaged or not present anymore
                     // lets try to read this from the primary again
                     String id = e.getSegmentId();
@@ -114,8 +114,7 @@ public class SegmentLoaderHandler extend
                     ByteArrayOutputStream bout = new ByteArrayOutputStream(s.size());
                     try {
                         s.writeTo(bout);
-                    }
-                    catch (IOException f) {
+                    } catch (IOException f) {
                         log.error("can't wrap segment to output stream", f);
                         throw e;
                     }
@@ -137,25 +136,50 @@ public class SegmentLoaderHandler extend
     }
 
     @Override
+    public Blob readBlob(String blobId) {
+        ctx.writeAndFlush(newGetBlobReq(this.clientID, blobId));
+        return getBlob(blobId);
+    }
+
+    @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
             throws Exception {
-        log.warn("Closing channel. Got exception: " + cause);
-        ctx.close();
+        log.error("Exception caught, closing channel.", cause);
+        close();
+    }
+
+    private Segment getSegment(final String id) {
+        return getReply(id, SegmentReply.SEGMENT).getSegment();
     }
 
-    // implementation of RemoteSegmentLoader
+    private Blob getBlob(final String id) {
+        return getReply(id, SegmentReply.BLOB).getBlob();
+    }
 
-    public Segment getSegment(final String id) {
+    private SegmentReply getReply(final String id, int type) {
         boolean interrupted = false;
         try {
             for (;;) {
                 try {
-                    Segment s = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
-                    if (s == null) {
-                        return null;
+                    SegmentReply r = segment.poll(timeoutMs, TimeUnit.MILLISECONDS);
+                    if (r == null) {
+                        log.warn("timeout waiting for {}", id);
+                        return SegmentReply.empty();
                     }
-                    if (s.getSegmentId().toString().equals(id)) {
-                        return s;
+                    if (r.getType() == type) {
+                        switch (r.getType()) {
+                        case SegmentReply.SEGMENT:
+                            if (r.getSegment().getSegmentId().toString()
+                                    .equals(id)) {
+                                return r;
+                            }
+                            break;
+                        case SegmentReply.BLOB:
+                            if (r.getBlob().getBlobId().equals(id)) {
+                                return r;
+                            }
+                            break;
+                        }
                     }
                 } catch (InterruptedException ignore) {
                     interrupted = true;
@@ -166,24 +190,26 @@ public class SegmentLoaderHandler extend
                 Thread.currentThread().interrupt();
             }
         }
-
     }
 
+    @Override
     public void close() {
         ctx.close();
-        if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
-            preloaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
-        }
         if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
             loaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
                     .syncUninterruptibly();
         }
     }
 
+    @Override
     public boolean isClosed() {
         return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor
                 .isShutdown()));
     }
 
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
 }

Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,156 @@
+package org.apache.jackrabbit.oak.plugins.segment.standby.client;
+
+import static org.apache.jackrabbit.oak.api.Type.BINARIES;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class StandbyApplyDiff implements NodeStateDiff {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(StandbyApplyDiff.class);
+
+    private final NodeBuilder builder;
+
+    private final SegmentStore store;
+
+    private final RemoteSegmentLoader loader;
+
+    private final String path;
+
+    public StandbyApplyDiff(NodeBuilder builder, SegmentStore store,
+            RemoteSegmentLoader loader) {
+        this(builder, store, loader, "/");
+    }
+
+    private StandbyApplyDiff(NodeBuilder builder, SegmentStore store,
+            RemoteSegmentLoader loader, String path) {
+        this.builder = builder;
+        this.store = store;
+        this.loader = loader;
+        this.path = path;
+        if (log.isTraceEnabled()) {
+            if (PathUtils.getDepth(path) < 5) {
+                log.trace("running diff on {}", path);
+            }
+        }
+    }
+
+    @Override
+    public boolean propertyAdded(PropertyState after) {
+        if (!loader.isRunning()) {
+            return false;
+        }
+        builder.setProperty(binaryCheck(after));
+        return true;
+    }
+
+    @Override
+    public boolean propertyChanged(PropertyState before, PropertyState after) {
+        if (!loader.isRunning()) {
+            return false;
+        }
+        builder.setProperty(binaryCheck(after));
+        return true;
+    }
+
+    @Override
+    public boolean propertyDeleted(PropertyState before) {
+        if (!loader.isRunning()) {
+            return false;
+        }
+        builder.removeProperty(before.getName());
+        return true;
+    }
+
+    private PropertyState binaryCheck(PropertyState property) {
+        Type<?> type = property.getType();
+        if (type == BINARY) {
+            binaryCheck(property.getValue(Type.BINARY), property.getName());
+        } else if (type == BINARIES) {
+            for (Blob blob : property.getValue(BINARIES)) {
+                binaryCheck(blob, property.getName());
+            }
+        }
+        return property;
+    }
+
+    private void binaryCheck(Blob b, String pName) {
+        if (b instanceof SegmentBlob) {
+            SegmentBlob sb = (SegmentBlob) b;
+            // verify if the blob exists
+            if (sb.isExternal() && b.getReference() == null) {
+                String blobId = sb.getBlobId();
+                if (blobId != null) {
+                    readBlob(blobId, pName);
+                }
+            }
+        } else {
+            log.warn("Unknown Blob {} at {}, ignoring", b.getClass().getName(),
+                    path + "#" + pName);
+        }
+    }
+
+    private void readBlob(String blobId, String pName) {
+        Blob read = loader.readBlob(blobId);
+        if (read != null) {
+            try {
+                store.getBlobStore().writeBlob(read.getNewStream());
+            } catch (IOException f) {
+                throw new IllegalStateException("Unable to persist blob "
+                        + blobId + " at " + path + "#" + pName, f);
+            }
+        } else {
+            throw new IllegalStateException("Unable to load remote blob "
+                    + blobId + " at " + path + "#" + pName);
+        }
+    }
+
+    @Override
+    public boolean childNodeAdded(String name, NodeState after) {
+        if (!loader.isRunning()) {
+            return false;
+        }
+        NodeBuilder child = EmptyNodeState.EMPTY_NODE.builder();
+        boolean success = EmptyNodeState.compareAgainstEmptyState(after,
+                new StandbyApplyDiff(child, store, loader, path + name + "/"));
+        if (success) {
+            builder.setChildNode(name, child.getNodeState());
+        }
+        return success;
+    }
+
+    @Override
+    public boolean childNodeChanged(String name, NodeState before,
+            NodeState after) {
+        if (!loader.isRunning()) {
+            return false;
+        }
+
+        return after.compareAgainstBaseState(before, new StandbyApplyDiff(
+                builder.getChildNode(name), store, loader, path + name + "/"));
+    }
+
+    @Override
+    public boolean childNodeDeleted(String name, NodeState before) {
+        if (!loader.isRunning()) {
+            return false;
+        }
+        builder.getChildNode(name).remove();
+        return true;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyApplyDiff.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java Thu Dec 11 16:06:19 2014
@@ -40,6 +40,7 @@ import java.io.Closeable;
 import java.lang.management.ManagementFactory;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
@@ -72,12 +73,13 @@ public final class StandbyClient impleme
     private EventExecutorGroup executor;
     private SslContext sslContext;
     private boolean active = false;
-    private boolean running;
     private int failedRequests;
     private long lastSuccessfulRequest;
     private volatile String state;
     private final Object sync = new Object();
 
+    private final AtomicBoolean running = new AtomicBoolean(true);
+
     public StandbyClient(String host, int port, SegmentStore store) throws SSLException {
         this(host, port, store, false);
     }
@@ -124,6 +126,10 @@ public final class StandbyClient impleme
     }
 
     public void run() {
+        if (!isRunning()) {
+            // manually stopped
+            return;
+        }
 
         Bootstrap b;
         synchronized (this.sync) {
@@ -132,7 +138,7 @@ public final class StandbyClient impleme
             }
             state = STATUS_STARTING;
             executor = new DefaultEventExecutorGroup(4);
-            handler = new StandbyClientHandler(this.store, executor, this.observer);
+            handler = new StandbyClientHandler(this.store, executor, this.observer, this.running);
             group = new NioEventLoopGroup();
 
             b = new Bootstrap();
@@ -160,7 +166,6 @@ public final class StandbyClient impleme
                 }
             });
             state = STATUS_RUNNING;
-            this.running = true;
             this.active = true;
         }
 
@@ -200,20 +205,19 @@ public final class StandbyClient impleme
     }
 
     @Override
-    public boolean isRunning() { return running;}
+    public boolean isRunning() {
+        return running.get();
+    }
 
     @Override
     public void start() {
-        if (!running) run();
+        running.set(true);
     }
 
     @Override
     public void stop() {
-        //TODO running flag doesn't make sense this way, since run() is usually scheduled to be called repeatedly.
-        if (running) {
-            running = false;
-            state = STATUS_STOPPED;
-        }
+        running.set(false);
+        state = STATUS_STOPPED;
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java Thu Dec 11 16:06:19 2014
@@ -26,17 +26,18 @@ import io.netty.util.concurrent.EventExe
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder;
-import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentDecoder;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder;
 import org.apache.jackrabbit.oak.plugins.segment.standby.store.CommunicationObserver;
 import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StandbyClientHandler extends
-        SimpleChannelInboundHandler<RecordId> implements Closeable {
+public class StandbyClientHandler extends SimpleChannelInboundHandler<RecordId>
+        implements Closeable {
 
     private static final Logger log = LoggerFactory
             .getLogger(StandbyClientHandler.class);
@@ -44,16 +45,18 @@ public class StandbyClientHandler extend
     private final StandbyStore store;
     private final EventExecutorGroup executor;
     private final CommunicationObserver observer;
-    private EventExecutorGroup preloaderExecutor;
-    private EventExecutorGroup loaderExecutor;
+    private final AtomicBoolean running;
 
+    private EventExecutorGroup loaderExecutor;
     private ChannelHandlerContext ctx;
 
     public StandbyClientHandler(final StandbyStore store,
-            EventExecutorGroup executor, CommunicationObserver observer) {
+            EventExecutorGroup executor, CommunicationObserver observer,
+            AtomicBoolean running) {
         this.store = store;
         this.executor = executor;
         this.observer = observer;
+        this.running = running;
     }
 
     @Override
@@ -87,33 +90,31 @@ public class StandbyClientHandler extend
         log.debug("updating current head to " + head);
         ctx.pipeline().remove(RecordIdDecoder.class);
         ctx.pipeline().remove(this);
-        ctx.pipeline().addLast(new SegmentDecoder(store));
-
-        preloaderExecutor = new DefaultEventExecutorGroup(4);
-        SegmentPreLoaderHandler h1 = new SegmentPreLoaderHandler();
-        ctx.pipeline().addLast(preloaderExecutor, h1);
+        ctx.pipeline().addLast(new ReplyDecoder(store));
 
         loaderExecutor = new DefaultEventExecutorGroup(4);
         SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head,
-                preloaderExecutor, loaderExecutor, this.observer.getID());
+                loaderExecutor, this.observer.getID(), running);
         ctx.pipeline().addLast(loaderExecutor, h2);
 
-        h1.channelActive(ctx);
         h2.channelActive(ctx);
         log.debug("updating current head finished");
     }
 
     @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+            throws Exception {
+        log.error("Exception caught, closing channel.", cause);
+        close();
+    }
+
+    @Override
     public void close() {
         ctx.close();
         if (!executor.isShuttingDown()) {
             executor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
                     .syncUninterruptibly();
         }
-        if (preloaderExecutor != null && !preloaderExecutor.isShuttingDown()) {
-            preloaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
-                    .syncUninterruptibly();
-        }
         if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) {
             loaderExecutor.shutdownGracefully(1, 2, TimeUnit.SECONDS)
                     .syncUninterruptibly();

Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public class BlobEncoder extends MessageToByteEncoder<Blob> {
+
+    // TODO
+    // if transferring large binaries turns out to be too intensive look into
+    // using a ChunkedWriteHandler and a new ChunkedStream(Blob.getNewStream())
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, Blob b, ByteBuf out)
+            throws Exception {
+        byte[] bytes = null;
+        InputStream s = b.getNewStream();
+        try {
+            bytes = IOUtils.toByteArray(s);
+        } finally {
+            s.close();
+        }
+
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long hash = hasher.putBytes(bytes).hash().padToLong();
+
+        out.writeInt(bytes.length);
+        out.writeByte(Messages.HEADER_BLOB);
+
+        String bid = b.getContentIdentity();
+        byte[] id = bid.getBytes(Charset.forName("UTF-8"));
+        out.writeInt(id.length);
+        out.writeBytes(id);
+
+        out.writeLong(hash);
+        out.writeBytes(bytes);
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/BlobEncoder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+
+import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
+
+public class IdArrayBasedBlob extends ArrayBasedBlob {
+
+    private final String blobId;
+
+    public IdArrayBasedBlob(byte[] value, String blobId) {
+        super(value);
+        this.blobId = blobId;
+    }
+
+    public String getBlobId() {
+        return blobId;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/IdArrayBasedBlob.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/Messages.java Thu Dec 11 16:06:19 2014
@@ -22,9 +22,11 @@ public class Messages {
 
     public static final byte HEADER_RECORD = 0x00;
     public static final byte HEADER_SEGMENT = 0x01;
+    public static final byte HEADER_BLOB = 0x02;
 
     public static final String GET_HEAD = "h";
     public static final String GET_SEGMENT = "s.";
+    public static final String GET_BLOB = "b.";
 
     private static final String MAGIC = "Standby-CMD@";
     private static final String SEPARATOR = ":";
@@ -41,6 +43,10 @@ public class Messages {
         return newRequest(clientID, GET_SEGMENT + sid);
     }
 
+    public static String newGetBlobReq(String clientID, String blobId) {
+        return newRequest(clientID, GET_BLOB + blobId);
+    }
+
     public static String extractMessageFrom(String payload) {
         if (payload.startsWith(MAGIC) && payload.length() > MAGIC.length()) {
             int i = payload.indexOf(SEPARATOR);

Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.standby.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.plugins.segment.Segment;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder.DecodingState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public class ReplyDecoder extends ReplayingDecoder<DecodingState> {
+
+    public enum DecodingState {
+        HEADER, SEGMENT, BLOB
+    }
+
+    private static final Logger log = LoggerFactory
+            .getLogger(ReplyDecoder.class);
+
+    private final SegmentStore store;
+
+    private int length = -1;
+    private byte type = -1;
+
+    public ReplyDecoder(SegmentStore store) {
+        super(DecodingState.HEADER);
+        this.store = store;
+    }
+
+    private void reset() {
+        checkpoint(DecodingState.HEADER);
+        length = -1;
+        type = -1;
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
+            List<Object> out) throws Exception {
+
+        switch (state()) {
+        case HEADER: {
+            length = in.readInt();
+            type = in.readByte();
+            switch (type) {
+            case Messages.HEADER_SEGMENT:
+                checkpoint(DecodingState.SEGMENT);
+                break;
+            case Messages.HEADER_BLOB:
+                checkpoint(DecodingState.BLOB);
+                break;
+            default:
+                throw new Exception("Unknown type: " + type);
+            }
+            return;
+        }
+
+        case SEGMENT: {
+            Segment s = decodeSegment(in, length, type);
+            if (s != null) {
+                out.add(SegmentReply.empty());
+                ctx.fireUserEventTriggered(new SegmentReply(s));
+                reset();
+            }
+            return;
+        }
+
+        case BLOB: {
+            IdArrayBasedBlob b = decodeBlob(in, length, type);
+            if (b != null) {
+                out.add(SegmentReply.empty());
+                ctx.fireUserEventTriggered(new SegmentReply(b));
+                reset();
+            }
+            return;
+        }
+
+        default:
+            throw new Exception("Unknown decoding state: " + state());
+        }
+    }
+
+    private Segment decodeSegment(ByteBuf in, int len, byte type) {
+        long msb = in.readLong();
+        long lsb = in.readLong();
+        long hash = in.readLong();
+        byte[] segment = new byte[len - 25];
+        in.readBytes(segment);
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long check = hasher.putBytes(segment).hash().padToLong();
+        if (hash == check) {
+            SegmentId id = new SegmentId(store.getTracker(), msb, lsb);
+            Segment s = new Segment(store.getTracker(), id,
+                    ByteBuffer.wrap(segment));
+            log.debug("received segment with id {} and size {}", id, s.size());
+            return s;
+        }
+        log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb));
+        return null;
+    }
+
+    private IdArrayBasedBlob decodeBlob(ByteBuf in, int length, byte type) {
+        int inIdLen = in.readInt();
+        byte[] bid = new byte[inIdLen];
+        in.readBytes(bid);
+        String id = new String(bid, Charset.forName("UTF-8"));
+
+        long hash = in.readLong();
+        byte[] blob = new byte[length];
+        in.readBytes(blob);
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long check = hasher.putBytes(blob).hash().padToLong();
+        if (hash == check) {
+            log.debug("received blob with id {} and size {}", id, blob.length);
+            return new IdArrayBasedBlob(blob, id);
+        }
+        log.debug("received corrupted binary {}, ignoring", id);
+        return null;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/ReplyDecoder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentDecoder.java Thu Dec 11 16:06:19 2014
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 
+@Deprecated
 public class SegmentDecoder extends LengthFieldBasedFrameDecoder {
 
     private static final Logger log = LoggerFactory

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/codec/SegmentReply.java Thu Dec 11 16:06:19 2014
@@ -20,14 +20,47 @@ import org.apache.jackrabbit.oak.plugins
 
 public class SegmentReply {
 
+    public static final int SEGMENT = 0;
+    public static final int BLOB = 1;
+
+    public static SegmentReply empty() {
+        return new SegmentReply();
+    }
+
+    private final int type;
+
     private final Segment segment;
 
+    private final IdArrayBasedBlob blob;
+
     public SegmentReply(Segment segment) {
+        this.type = SEGMENT;
         this.segment = segment;
+        this.blob = null;
+    }
+
+    public SegmentReply(IdArrayBasedBlob blob) {
+        this.type = BLOB;
+        this.segment = null;
+        this.blob = blob;
+    }
+
+    private SegmentReply() {
+        this.type = -1;
+        this.segment = null;
+        this.blob = null;
     }
 
     public Segment getSegment() {
         return this.segment;
     }
 
+    public IdArrayBasedBlob getBlob() {
+        return blob;
+    }
+
+    public int getType() {
+        return type;
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java Thu Dec 11 16:06:19 2014
@@ -1,48 +1,55 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
-
-import org.apache.jackrabbit.oak.commons.jmx.Description;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-
-public interface ObservablePartnerMBean {
-
-    @Nonnull
-    @Description("name of the partner")
-    String getName();
-
-    @Description("IP of the remote")
-    String getRemoteAddress();
-
-    @Description("Last request")
-    String getLastRequest();
-
-    @Description("Port of the remote")
-    int getRemotePort();
-
-    @CheckForNull
-    @Description("Time the remote instance was last contacted")
-    String getLastSeenTimestamp();
-
-    @Description("Number of transferred segments")
-    long getTransferredSegments();
-
-    @Description("Number of bytes stored in transferred segments")
-    long getTransferredSegmentBytes();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+public interface ObservablePartnerMBean {
+
+    @Nonnull
+    @Description("name of the partner")
+    String getName();
+
+    @Description("IP of the remote")
+    String getRemoteAddress();
+
+    @Description("Last request")
+    String getLastRequest();
+
+    @Description("Port of the remote")
+    int getRemotePort();
+
+    @CheckForNull
+    @Description("Time the remote instance was last contacted")
+    String getLastSeenTimestamp();
+
+    @Description("Number of transferred segments")
+    long getTransferredSegments();
+
+    @Description("Number of bytes stored in transferred segments")
+    long getTransferredSegmentBytes();
+
+    @Description("Number of transferred binaries")
+    long getTransferredBinaries();
+
+    @Description("Number of bytes stored in transferred binaries")
+    long getTransferredBinariesBytes();
+
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/ObservablePartnerMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java Thu Dec 11 16:06:19 2014
@@ -1,47 +1,47 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
-
-import org.apache.jackrabbit.oak.commons.jmx.Description;
-import javax.annotation.Nonnull;
-
-public interface StandbyStatusMBean {
-    public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"Standby\"";
-    public static final String STATUS_INITIALIZING = "initializing";
-    public static final String STATUS_STOPPED = "stopped";
-    public static final String STATUS_STARTING = "starting";
-    public static final String STATUS_RUNNING = "running";
-    public static final String STATUS_CLOSING = "closing";
-    public static final String STATUS_CLOSED = "closed";
-
-    @Nonnull
-    @Description("primary or standby")
-    String getMode();
-
-    @Description("current status of the service")
-    String getStatus();
-
-    @Description("instance is running")
-    boolean isRunning();
-
-    @Description("stop the communication")
-    void stop();
-
-    @Description("start the communication")
-    void start();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment.standby.jmx;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import javax.annotation.Nonnull;
+
+public interface StandbyStatusMBean {
+    public static final String JMX_NAME = "org.apache.jackrabbit.oak:name=Status,type=\"Standby\"";
+    public static final String STATUS_INITIALIZING = "initializing";
+    public static final String STATUS_STOPPED = "stopped";
+    public static final String STATUS_STARTING = "starting";
+    public static final String STATUS_RUNNING = "running";
+    public static final String STATUS_CLOSING = "closing";
+    public static final String STATUS_CLOSED = "closed";
+
+    @Nonnull
+    @Description("primary or standby")
+    String getMode();
+
+    @Description("current status of the service")
+    String getStatus();
+
+    @Description("instance is running")
+    boolean isRunning();
+
+    @Description("stop the communication")
+    void stop();
+
+    @Description("start the communication")
+    void start();
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/jmx/StandbyStatusMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServer.java Thu Dec 11 16:06:19 2014
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -33,15 +34,15 @@ import io.netty.handler.codec.string.Str
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Future;
 
 import java.io.Closeable;
 import java.lang.management.ManagementFactory;
 import java.security.cert.CertificateException;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.util.concurrent.Future;
-
 import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.codec.BlobEncoder;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdEncoder;
 import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentEncoder;
 import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
@@ -129,6 +130,7 @@ public class StandbyServer implements St
                 p.addLast(new SnappyFramedEncoder());
                 p.addLast(new RecordIdEncoder());
                 p.addLast(new SegmentEncoder());
+                p.addLast(new BlobEncoder());
                 p.addLast(handler);
             }
         });
@@ -169,12 +171,31 @@ public class StandbyServer implements St
             public void run() {
                 try {
                     running = true;
+                    handler.state = STATUS_RUNNING;
                     channelFuture.sync().channel().closeFuture().sync();
                 } catch (InterruptedException e) {
                     StandbyServer.this.stop();
                 }
             }
         };
+        final ChannelFutureListener bindListener = new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) {
+                if (future.isSuccess()) {
+                    close.start();
+                } else {
+                    log.error("Server failed to start, will be canceled",
+                            future.cause());
+                    future.channel().close();
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            close();
+                        }
+                    }.start();
+                }
+            }
+        };
         Future<?> startup = bossGroup.submit(new Runnable() {
             @Override
             public void run() {
@@ -184,7 +205,7 @@ public class StandbyServer implements St
                 //the channel registration synchronous.
                 //Note that now this method will return immediately.
                 channelFuture = b.bind(port);
-                close.start();
+                channelFuture.addListener(bindListener);
             }
         });
         if (!startup.awaitUninterruptibly(10000)) {

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/server/StandbyServerHandler.java Thu Dec 11 16:06:19 2014
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelHandler.S
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 
+import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
@@ -158,7 +159,7 @@ public class StandbyServerHandler extend
                     } catch (IllegalStateException e) {
                         // segment not found
                         log.debug("waiting for segment. Got exception: " + e.getMessage());
-                        TimeUnit.MILLISECONDS.sleep(1000);
+                        TimeUnit.MILLISECONDS.sleep(2000);
                     }
                     if (s != null) break;
                 }
@@ -169,6 +170,15 @@ public class StandbyServerHandler extend
                     observer.didSendSegmentBytes(clientID, s.size());
                     return;
                 }
+            } else if (request.startsWith(Messages.GET_BLOB)) {
+                String bid = request.substring(Messages.GET_BLOB.length());
+                log.debug("request blob id {}", bid);
+                Blob b = store.readBlob(bid);
+                log.debug("sending blob " + bid + " to " + client);
+                ctx.writeAndFlush(b);
+                observer.didSendBinariesBytes(clientID,
+                        Math.max(0, (int) b.length()));
+                return;
             } else {
                 log.warn("Unknown request {}, ignoring.", request);
             }

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java Thu Dec 11 16:06:19 2014
@@ -1,179 +1,199 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment.standby.store;
-
-
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
-import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-public class CommunicationObserver {
-    private static final int MAX_CLIENT_STATISTICS = 10;
-
-    private class CommunicationPartnerMBean implements ObservablePartnerMBean {
-        private final ObjectName mbeanName;
-        private final String clientName;
-        public String lastRequest;
-        public Date lastSeen;
-        public String remoteAddress;
-        public int remotePort;
-        public long segmentsSent;
-        public long segmentBytesSent;
-
-        public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
-            this.clientName = clientName;
-            this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
-        }
-
-        public ObjectName getMBeanName() {
-            return this.mbeanName;
-        }
-
-        @Override
-        public String getName() {
-            return this.clientName;
-        }
-
-        @Override
-        public String getRemoteAddress() {
-            return this.remoteAddress;
-        }
-
-        @Override
-        public String getLastRequest() {
-            return this.lastRequest;
-        }
-
-        @Override
-        public int getRemotePort() {
-            return this.remotePort;
-        }
-
-        @Override
-        public String getLastSeenTimestamp() {
-            return this.lastSeen == null ? null : this.lastSeen.toString();
-        }
-
-        @Override
-        public long getTransferredSegments() {
-            return this.segmentsSent;
-        }
-
-        @Override
-        public long getTransferredSegmentBytes() {
-            return this.segmentBytesSent;
-        }
-    }
-
-    private static final Logger log = LoggerFactory
-            .getLogger(CommunicationObserver.class);
-
-    private final String identifier;
-    private final Map<String, CommunicationPartnerMBean> partnerDetails;
-
-    public CommunicationObserver(String myID) {
-        this.identifier = myID;
-        this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
-    }
-
-    private void unregister(CommunicationPartnerMBean m) {
-        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
-        try {
-            jmxServer.unregisterMBean(m.getMBeanName());
-        }
-        catch (Exception e) {
-            log.error("error unregistering mbean for client '" + m.getName() + "'", e);
-        }
-    }
-
-    public void unregister() {
-        for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
-            unregister(m);
-        }
-    }
-
-    public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
-        log.debug("got message '" + request + "' from client " + client);
-        CommunicationPartnerMBean m = this.partnerDetails.get(client);
-        boolean register = false;
-        if (m == null) {
-            cleanUp();
-            m = new CommunicationPartnerMBean(client);
-            m.remoteAddress = remote.getAddress().getHostAddress();
-            m.remotePort = remote.getPort();
-            register = true;
-        }
-        m.lastSeen = new Date();
-        m.lastRequest = request;
-        this.partnerDetails.put(client, m);
-        if (register) {
-            final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
-            try {
-                jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
-            }
-            catch (Exception e) {
-                log.error("can register mbean for client '" + m.getName() + "'", e);
-            }
-        }
-    }
-
-    public void didSendSegmentBytes(String client, int size) {
-        log.debug("did send segment with " + size + " bytes to client " + client);
-        CommunicationPartnerMBean m = this.partnerDetails.get(client);
-        m.segmentsSent++;
-        m.segmentBytesSent += size;
-        this.partnerDetails.put(client, m);
-    }
-
-    public String getID() {
-        return this.identifier;
-    }
-
-    // helper
-
-    private void cleanUp() {
-        while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
-            CommunicationPartnerMBean oldestEntry = oldestEntry();
-            if (oldestEntry == null) return;
-            log.info("housekeeping: removing statistics for " + oldestEntry.getName());
-            unregister(oldestEntry);
-            this.partnerDetails.remove(oldestEntry.getName());
-        }
-    }
-
-    private CommunicationPartnerMBean oldestEntry() {
-        CommunicationPartnerMBean ret = null;
-        for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
-            if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
-        }
-        return ret;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.standby.store;
+
+
+import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.StandbyStatusMBean;
+import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ObservablePartnerMBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CommunicationObserver {
+    private static final int MAX_CLIENT_STATISTICS = 10;
+
+    private class CommunicationPartnerMBean implements ObservablePartnerMBean {
+        private final ObjectName mbeanName;
+        private final String clientName;
+        public String lastRequest;
+        public Date lastSeen;
+        public String remoteAddress;
+        public int remotePort;
+        public long segmentsSent;
+        public long segmentBytesSent;
+        public long binariesSent;
+        public long binariesBytesSent;
+
+        public CommunicationPartnerMBean(String clientName) throws MalformedObjectNameException {
+            this.clientName = clientName;
+            this.mbeanName = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=\"Client " + clientName + "\"");
+        }
+
+        public ObjectName getMBeanName() {
+            return this.mbeanName;
+        }
+
+        @Override
+        public String getName() {
+            return this.clientName;
+        }
+
+        @Override
+        public String getRemoteAddress() {
+            return this.remoteAddress;
+        }
+
+        @Override
+        public String getLastRequest() {
+            return this.lastRequest;
+        }
+
+        @Override
+        public int getRemotePort() {
+            return this.remotePort;
+        }
+
+        @Override
+        public String getLastSeenTimestamp() {
+            return this.lastSeen == null ? null : this.lastSeen.toString();
+        }
+
+        @Override
+        public long getTransferredSegments() {
+            return this.segmentsSent;
+        }
+
+        @Override
+        public long getTransferredSegmentBytes() {
+            return this.segmentBytesSent;
+        }
+
+        @Override
+        public long getTransferredBinaries() {
+            return this.binariesSent;
+        }
+
+        @Override
+        public long getTransferredBinariesBytes() {
+            return this.binariesBytesSent;
+        }
+    }
+
+    private static final Logger log = LoggerFactory
+            .getLogger(CommunicationObserver.class);
+
+    private final String identifier;
+    private final Map<String, CommunicationPartnerMBean> partnerDetails;
+
+    public CommunicationObserver(String myID) {
+        this.identifier = myID;
+        this.partnerDetails = new HashMap<String, CommunicationPartnerMBean>();
+    }
+
+    private void unregister(CommunicationPartnerMBean m) {
+        final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+        try {
+            jmxServer.unregisterMBean(m.getMBeanName());
+        }
+        catch (Exception e) {
+            log.error("error unregistering mbean for client '" + m.getName() + "'", e);
+        }
+    }
+
+    public void unregister() {
+        for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+            unregister(m);
+        }
+    }
+
+    public void gotMessageFrom(String client, String request, InetSocketAddress remote) throws MalformedObjectNameException {
+        log.debug("got message '" + request + "' from client " + client);
+        CommunicationPartnerMBean m = this.partnerDetails.get(client);
+        boolean register = false;
+        if (m == null) {
+            cleanUp();
+            m = new CommunicationPartnerMBean(client);
+            m.remoteAddress = remote.getAddress().getHostAddress();
+            m.remotePort = remote.getPort();
+            register = true;
+        }
+        m.lastSeen = new Date();
+        m.lastRequest = request;
+        this.partnerDetails.put(client, m);
+        if (register) {
+            final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
+            try {
+                jmxServer.registerMBean(new StandardMBean(m, ObservablePartnerMBean.class), m.getMBeanName());
+            }
+            catch (Exception e) {
+                log.error("can register mbean for client '" + m.getName() + "'", e);
+            }
+        }
+    }
+
+    public void didSendSegmentBytes(String client, int size) {
+        log.debug("did send segment with " + size + " bytes to client " + client);
+        CommunicationPartnerMBean m = this.partnerDetails.get(client);
+        m.segmentsSent++;
+        m.segmentBytesSent += size;
+        this.partnerDetails.put(client, m);
+    }
+
+    public void didSendBinariesBytes(String client, int size) {
+        log.debug("did send binary with " + size + " bytes to client " + client);
+        CommunicationPartnerMBean m = this.partnerDetails.get(client);
+        m.binariesSent++;
+        m.binariesBytesSent += size;
+        this.partnerDetails.put(client, m);
+    }
+
+    public String getID() {
+        return this.identifier;
+    }
+
+    // helper
+
+    private void cleanUp() {
+        while (this.partnerDetails.size() >= MAX_CLIENT_STATISTICS) {
+            CommunicationPartnerMBean oldestEntry = oldestEntry();
+            if (oldestEntry == null) return;
+            log.info("housekeeping: removing statistics for " + oldestEntry.getName());
+            unregister(oldestEntry);
+            this.partnerDetails.remove(oldestEntry.getName());
+        }
+    }
+
+    private CommunicationPartnerMBean oldestEntry() {
+        CommunicationPartnerMBean ret = null;
+        for (CommunicationPartnerMBean m : this.partnerDetails.values()) {
+            if (ret == null || ret.lastSeen.after(m.lastSeen)) ret = m;
+        }
+        return ret;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/CommunicationObserver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/RemoteSegmentLoader.java Thu Dec 11 16:06:19 2014
@@ -16,14 +16,19 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.standby.store;
 
+import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 
 public interface RemoteSegmentLoader {
 
     Segment readSegment(String id);
 
+    Blob readBlob(String blobId);
+
     void close();
 
     boolean isClosed();
 
+    boolean isRunning();
+
 }

Modified: jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java?rev=1644689&r1=1644688&r2=1644689&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/store/StandbyStoreService.java Thu Dec 11 16:06:19 2014
@@ -144,7 +144,7 @@ public class StandbyStoreService {
         Dictionary<Object, Object> dictionary = new Hashtable<Object, Object>();
         dictionary.put("scheduler.period", interval);
         dictionary.put("scheduler.concurrent", false);
-        dictionary.put("scheduler.runOn", "SINGLE");
+        // dictionary.put("scheduler.runOn", "SINGLE");
 
         syncReg = context.getBundleContext().registerService(
                 Runnable.class.getName(), sync, dictionary);

Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,134 @@
+package org.apache.jackrabbit.oak.plugins.segment.standby;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
+import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+
+public class ExternalPrivateStoreIT extends TestBase {
+
+    private File primaryStore;
+    private File secondaryStore;
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndClient();
+    }
+
+    @After
+    public void after() {
+        closeServerAndClient();
+        try {
+            FileUtils.deleteDirectory(primaryStore);
+            FileUtils.deleteDirectory(secondaryStore);
+        } catch (IOException e) {
+        }
+    }
+
+    @Override
+    protected FileStore setupPrimary(File d) throws IOException {
+        primaryStore = getPrimaryStoreDir();
+        FileDataStore fds = new FileDataStore();
+        fds.setMinRecordLength(4092);
+        fds.init(primaryStore.getAbsolutePath());
+        DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+        return new FileStore(blobStore, d, 1, false);
+    }
+
+    private static File getPrimaryStoreDir() throws IOException {
+        return createTmpTargetDir("ExternalStoreITPrimary");
+    }
+
+    @Override
+    protected FileStore setupSecondary(File d) throws IOException {
+        secondaryStore = getSecondaryStoreDir();
+        FileDataStore fds = new FileDataStore();
+        fds.setMinRecordLength(4092);
+        fds.init(secondaryStore.getAbsolutePath());
+        DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+        return new FileStore(blobStore, d, 1, false);
+    }
+
+    private static File getSecondaryStoreDir() throws IOException {
+        return createTmpTargetDir("ExternalStoreITSecondary");
+    }
+
+    @Test
+    public void testSync() throws Exception {
+        final int mb = 1 * 1024 * 1024;
+        final int blobSize = 5 * mb;
+        FileStore primary = getPrimary();
+        FileStore secondary = getSecondary();
+
+        NodeStore store = new SegmentNodeStore(primary);
+        final StandbyServer server = new StandbyServer(getPort(), primary);
+        server.start();
+        byte[] data = addTestContent(store, "server", blobSize);
+
+        StandbyClient cl = new StandbyClient("127.0.0.1", getPort(), secondary);
+        cl.run();
+
+        try {
+            assertEquals(primary.getHead(), secondary.getHead());
+        } finally {
+            server.close();
+            cl.close();
+        }
+
+        assertTrue(primary.size() < mb);
+        assertTrue(secondary.size() < mb);
+
+        PropertyState ps = secondary.getHead().getChildNode("root")
+                .getChildNode("server").getProperty("testBlob");
+        assertNotNull(ps);
+        assertEquals(Type.BINARY.tag(), ps.getType().tag());
+        Blob b = ps.getValue(Type.BINARY);
+        assertEquals(blobSize, b.length());
+        byte[] testData = new byte[blobSize];
+        ByteStreams.readFully(b.getNewStream(), testData);
+        assertArrayEquals(data, testData);
+    }
+
+    private static byte[] addTestContent(NodeStore store, String child, int size)
+            throws CommitFailedException, IOException {
+        NodeBuilder builder = store.getRoot().builder();
+        builder.child(child).setProperty("ts", System.currentTimeMillis());
+
+        byte[] data = new byte[size];
+        new Random().nextBytes(data);
+        Blob blob = store.createBlob(new ByteArrayInputStream(data));
+
+        builder.child(child).setProperty("testBlob", blob);
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        return data;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalPrivateStoreIT.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java?rev=1644689&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java (added)
+++ jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java Thu Dec 11 16:06:19 2014
@@ -0,0 +1,127 @@
+package org.apache.jackrabbit.oak.plugins.segment.standby;
+
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentTestUtils.createTmpTargetDir;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.plugins.segment.standby.client.StandbyClient;
+import org.apache.jackrabbit.oak.plugins.segment.standby.server.StandbyServer;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+
+public class ExternalSharedStoreIT extends TestBase {
+
+    private File externalStore;
+
+    @Before
+    public void setUp() throws Exception {
+        setUpServerAndClient();
+    }
+
+    @After
+    public void after() {
+        closeServerAndClient();
+        try {
+            FileUtils.deleteDirectory(externalStore);
+        } catch (IOException e) {
+        }
+    }
+
+    @Override
+    protected FileStore setupPrimary(File d) throws IOException {
+        externalStore = getPrimaryStoreDir();
+        FileDataStore fds = new FileDataStore();
+        fds.setMinRecordLength(4092);
+        fds.init(externalStore.getAbsolutePath());
+        DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+        return new FileStore(blobStore, d, 1, false);
+    }
+
+    private static File getPrimaryStoreDir() throws IOException {
+        return createTmpTargetDir("ExternalCommonStoreIT");
+    }
+
+    @Override
+    protected FileStore setupSecondary(File d) throws IOException {
+        FileDataStore fds = new FileDataStore();
+        fds.setMinRecordLength(4092);
+        fds.init(externalStore.getAbsolutePath());
+        DataStoreBlobStore blobStore = new DataStoreBlobStore(fds);
+        return new FileStore(blobStore, d, 1, false);
+    }
+
+    @Test
+    public void testSync() throws Exception {
+        final int mb = 1 * 1024 * 1024;
+        final int blobSize = 5 * mb;
+        FileStore primary = getPrimary();
+        FileStore secondary = getSecondary();
+
+        NodeStore store = new SegmentNodeStore(primary);
+        final StandbyServer server = new StandbyServer(getPort(), primary);
+        server.start();
+        byte[] data = addTestContent(store, "server", blobSize);
+
+        StandbyClient cl = new StandbyClient("127.0.0.1", getPort(), secondary);
+        cl.run();
+
+        try {
+            assertEquals(primary.getHead(), secondary.getHead());
+        } finally {
+            server.close();
+            cl.close();
+        }
+
+        assertTrue(primary.size() < mb);
+        assertTrue(secondary.size() < mb);
+
+        PropertyState ps = secondary.getHead().getChildNode("root")
+                .getChildNode("server").getProperty("testBlob");
+        assertNotNull(ps);
+        assertEquals(Type.BINARY.tag(), ps.getType().tag());
+        Blob b = ps.getValue(Type.BINARY);
+        assertEquals(blobSize, b.length());
+        byte[] testData = new byte[blobSize];
+        ByteStreams.readFully(b.getNewStream(), testData);
+        assertArrayEquals(data, testData);
+    }
+
+    private static byte[] addTestContent(NodeStore store, String child, int size)
+            throws CommitFailedException, IOException {
+        NodeBuilder builder = store.getRoot().builder();
+        builder.child(child).setProperty("ts", System.currentTimeMillis());
+
+        byte[] data = new byte[size];
+        new Random().nextBytes(data);
+        Blob blob = store.createBlob(new ByteArrayInputStream(data));
+
+        builder.child(child).setProperty("testBlob", blob);
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        return data;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-tarmk-standby/src/test/java/org/apache/jackrabbit/oak/plugins/segment/standby/ExternalSharedStoreIT.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain