You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/10/05 19:34:00 UTC

svn commit: r1763480 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/standby/client/ main/java/org/apache/jackrabbit/oak/segment/standby/codec/ main/java/org/apache/jackrabbit/oak/segment/standby/server/ test...

Author: frm
Date: Wed Oct  5 19:34:00 2016
New Revision: 1763480

URL: http://svn.apache.org/viewvc?rev=1763480&view=rev
Log:
OAK-4897 - Use a new RPC to figure out the storage order of segments

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.client;
+
+import java.util.Queue;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponse;
+
+class GetReferencesResponseHandler extends SimpleChannelInboundHandler<GetReferencesResponse> {
+
+    private final Queue<GetReferencesResponse> queue;
+
+    GetReferencesResponseHandler(Queue<GetReferencesResponse> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, GetReferencesResponse msg) throws Exception {
+        queue.offer(msg);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java Wed Oct  5 19:34:00 2016
@@ -43,6 +43,9 @@ import org.apache.jackrabbit.oak.segment
 import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequest;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequestEncoder;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponse;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesRequest;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesRequestEncoder;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponse;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequest;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequestEncoder;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponse;
@@ -60,6 +63,8 @@ class StandbyClient implements AutoClose
 
     private final BlockingQueue<GetBlobResponse> blobQueue = new LinkedBlockingDeque<>();
 
+    private final BlockingQueue<GetReferencesResponse> referencesQueue = new LinkedBlockingDeque<>();
+
     private final boolean secure;
 
     private final int readTimeoutMs;
@@ -124,12 +129,14 @@ class StandbyClient implements AutoClose
                         p.addLast(new GetHeadRequestEncoder());
                         p.addLast(new GetSegmentRequestEncoder());
                         p.addLast(new GetBlobRequestEncoder());
+                        p.addLast(new GetReferencesRequestEncoder());
 
                         // Handlers
 
                         p.addLast(new GetHeadResponseHandler(headQueue));
                         p.addLast(new GetSegmentResponseHandler(segmentQueue));
                         p.addLast(new GetBlobResponseHandler(blobQueue));
+                        p.addLast(new GetReferencesResponseHandler(referencesQueue));
                     }
 
                 });
@@ -192,4 +199,16 @@ class StandbyClient implements AutoClose
         return response.getBlobData();
     }
 
+    Iterable<String> getReferences(String segmentId) throws InterruptedException {
+        channel.writeAndFlush(new GetReferencesRequest(clientId, segmentId));
+
+        GetReferencesResponse response = referencesQueue.poll(readTimeoutMs, TimeUnit.MILLISECONDS);
+
+        if (response == null) {
+            return null;
+        }
+
+        return response.getReferences();
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java Wed Oct  5 19:34:00 2016
@@ -22,8 +22,10 @@ package org.apache.jackrabbit.oak.segmen
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.nio.file.Files;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -143,12 +145,9 @@ public final class StandbyClientSync imp
                 new StandbyClientSyncExecution(fileStore, client, newRunningSupplier()).execute();
                 long sizeAfter = fileStore.getStats().getApproximateSize();
 
-                if (autoClean && sizeBefore > 0) {
-                    // if size gain is over 25% call cleanup
-                    if (sizeAfter - sizeBefore > 0.25 * sizeBefore) {
-                        log.info("Store size increased from {} to {}, will run cleanup.", humanReadableByteCount(sizeBefore), humanReadableByteCount(sizeAfter));
-                        fileStore.cleanup();
-                    }
+                if (autoClean && sizeAfter > 1.25 * sizeBefore) {
+                    log.info("Store size increased from {} to {}, will run cleanup.", humanReadableByteCount(sizeBefore), humanReadableByteCount(sizeAfter));
+                    cleanupAndRemove();
                 }
             }
             this.failedRequests = 0;
@@ -165,6 +164,18 @@ public final class StandbyClientSync imp
         }
     }
 
+    private void cleanupAndRemove() throws IOException {
+        for (File file : fileStore.cleanup()) {
+            log.info("Removing file {}", file);
+
+            try {
+                Files.deleteIfExists(file.toPath());
+            } catch (IOException e) {
+                log.warn(String.format("Unable to remove file %s", file), e);
+            }
+        }
+    }
+
     private Supplier<Boolean> newRunningSupplier() {
         return new Supplier<Boolean>() {
 
@@ -229,7 +240,7 @@ public final class StandbyClientSync imp
     @Override
     public void cleanup() {
         try {
-            fileStore.cleanup();
+            cleanupAndRemove();
         } catch (IOException e) {
             log.error("Error while cleaning up", e);
         }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java Wed Oct  5 19:34:00 2016
@@ -18,15 +18,13 @@
 package org.apache.jackrabbit.oak.segment.standby.client;
 
 import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Sets.newHashSet;
 
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import javax.annotation.Nonnull;
-
 import com.google.common.base.Supplier;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.Segment;
@@ -54,10 +52,6 @@ class StandbyClientSyncExecution {
 
     private final Supplier<Boolean> running;
 
-    private final Set<UUID> queued = newHashSet();
-
-    private final Set<UUID> local = newHashSet();
-
     private final Map<UUID, Segment> cache = newHashMap();
 
     StandbyClientSyncExecution(FileStore store, StandbyClient client, Supplier<Boolean> running) {
@@ -78,7 +72,8 @@ class StandbyClientSyncExecution {
         SegmentNodeBuilder builder = before.builder();
         SegmentNodeState current = newSegmentNodeState(remoteHead);
         compareAgainstBaseState(current, before, builder);
-        boolean ok = setHead(before, builder.getNodeState());
+        boolean ok = store.getRevisions().setHead(before.getRecordId(), remoteHead);
+        store.flush();
         log.debug("updated head state successfully: {} in {}ms.", ok, System.currentTimeMillis() - t);
     }
 
@@ -90,10 +85,6 @@ class StandbyClientSyncExecution {
         return store.getReader().readNode(id);
     }
 
-    private boolean setHead(@Nonnull SegmentNodeState expected, @Nonnull SegmentNodeState head) {
-        return store.getRevisions().setHead(expected.getRecordId(), head.getRecordId());
-    }
-
     private boolean compareAgainstBaseState(SegmentNodeState current, SegmentNodeState before, SegmentNodeBuilder builder) throws Exception {
         while (true) {
             try {
@@ -110,22 +101,54 @@ class StandbyClientSyncExecution {
 
         batch.offer(segmentId);
 
+        LinkedList<UUID> bulk = new LinkedList<>();
+        LinkedList<UUID> data = new LinkedList<>();
+
+        Set<UUID> visited = new HashSet<>();
+        Set<UUID> queued = new HashSet<>();
+        Set<UUID> local = new HashSet<>();
+
         while (batch.size() > 0) {
             UUID current = batch.remove();
 
-            log.debug("Loading segment {}", current);
-            Segment segment = copySegmentFromPrimary(current);
-
-            log.debug("Marking segment {} as loaded", current);
-            local.add(current);
+            log.debug("Inspecting segment {}", current);
+            visited.add(current);
 
-            if (!SegmentId.isDataSegmentId(current.getLeastSignificantBits())) {
+            // Add the current segment ID at the beginning of the respective
+            // list, depending on its type. This allows to process those
+            // segments in an optimal topological order later on. If the current
+            // segment is a bulk segment, we can skip the rest of the loop,
+            // since bulk segments don't reference any other segment.
+
+            if (SegmentId.isDataSegmentId(current.getLeastSignificantBits())) {
+                data.addFirst(current);
+            } else {
+                bulk.addFirst(current);
                 continue;
             }
 
-            log.debug("Inspecting segment {} for references", current);
-            for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
-                UUID referenced = segment.getReferencedSegmentId(i);
+            for (String s : readReferences(current)) {
+                UUID referenced = UUID.fromString(s);
+
+                // Short circuit for the "backward reference". The segment graph
+                // is not guaranteed to be acyclic, so there might be segments
+                // pointing back to a previously visited (but locally
+                // unavailable) segment.
+
+                if (visited.contains(referenced)) {
+                    continue;
+                }
+
+                // Short circuit for the "diamond problem". Imagine that segment
+                // S1 references S2 and S3 and both S2 and S3 reference S4.
+                // These references form the shape of a diamond. If the segments
+                // are processed in the order S1, S2, S3, then S4 is added twice
+                // to the 'batch' queue. The following check prevents processing
+                // S4 twice or more.
+
+                if (queued.contains(referenced)) {
+                    continue;
+                }
 
                 // Short circuit for the "sharing-is-caring problem". If many
                 // new segments are sharing segments that are already locally
@@ -140,20 +163,6 @@ class StandbyClientSyncExecution {
 
                 if (isLocal(referenced)) {
                     local.add(referenced);
-                }
-
-                if (local.contains(referenced)) {
-                    continue;
-                }
-
-                // Short circuit for the "diamond problem". Imagine that segment S1
-                // references S2 and S3 and both S2 and S3 reference S4. These
-                // references form the shape of a diamond. If the segments are
-                // processed in the order S1, S2, S3, then S4 is added twice to the
-                // 'batch' queue. The following check prevents processing S4 twice
-                // or more.
-
-                if (queued.contains(referenced)) {
                     continue;
                 }
 
@@ -163,16 +172,31 @@ class StandbyClientSyncExecution {
                 // queue and transfer the segment later.
 
                 log.debug("Found reference from {} to {}", current, referenced);
-
-                if (SegmentId.isDataSegmentId(referenced.getLeastSignificantBits())) {
-                    batch.add(referenced);
-                } else {
-                    batch.addFirst(referenced);
-                }
-
+                batch.add(referenced);
                 queued.add(referenced);
             }
         }
+
+        for (UUID id : bulk) {
+            log.info("Copying bulk segment {} from primary", id);
+            copySegmentFromPrimary(id);
+        }
+
+        for (UUID id : data) {
+            log.info("Copying data segment {} from primary", id);
+            copySegmentFromPrimary(id);
+        }
+
+    }
+
+    private Iterable<String> readReferences(UUID id) throws InterruptedException {
+        Iterable<String> references = client.getReferences(id.toString());
+
+        if (references == null) {
+            throw new IllegalStateException(String.format("Unable to read references of segment %s from primary", id));
+        }
+
+        return references;
     }
 
     private boolean isLocal(UUID id) {
@@ -192,12 +216,12 @@ class StandbyClientSyncExecution {
         return persisted;
     }
 
-    private Segment copySegmentFromPrimary(UUID uuid) throws Exception {
+    private void copySegmentFromPrimary(UUID uuid) throws Exception {
         Segment result = cache.get(uuid);
 
         if (result != null) {
             log.debug("Segment {} was found in the local cache", uuid);
-            return result;
+            return;
         }
 
         byte[] data = client.getSegment(uuid.toString());
@@ -212,7 +236,6 @@ class StandbyClientSyncExecution {
         store.writeSegment(segmentId, data, 0, data.length);
         result = segmentId.getSegment();
         cache.put(uuid, result);
-        return result;
     }
 
 }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+public class GetReferencesRequest {
+
+    private final String clientId;
+
+    private final String segmentId;
+
+    public GetReferencesRequest(String clientId, String segmentId) {
+        this.clientId = clientId;
+        this.segmentId = segmentId;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public String getSegmentId() {
+        return segmentId;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+import java.util.List;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetReferencesRequestEncoder extends MessageToMessageEncoder<GetReferencesRequest> {
+
+    private final Logger log = LoggerFactory.getLogger(GetReferencesRequestEncoder.class);
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, GetReferencesRequest msg, List<Object> out) throws Exception {
+        log.debug("Sending request from client {} for references of segment {}", msg.getClientId(), msg.getSegmentId());
+        out.add(Messages.newGetReferencesRequest(msg.getClientId(), msg.getSegmentId()));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+public class GetReferencesResponse {
+
+    private final String clientId;
+
+    private final String segmentId;
+
+    private final Iterable<String> references;
+
+    public GetReferencesResponse(String clientId, String segmentId, Iterable<String> references) {
+        this.clientId = clientId;
+        this.segmentId = segmentId;
+        this.references = references;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public String getSegmentId() {
+        return segmentId;
+    }
+
+    public Iterable<String> getReferences() {
+        return references;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetReferencesResponseEncoder extends MessageToByteEncoder<GetReferencesResponse> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetReferencesResponseEncoder.class);
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, GetReferencesResponse msg, ByteBuf out) throws Exception {
+        log.debug("Sending references of segment {} to client {}", msg.getSegmentId(), msg.getClientId());
+        encode(msg.getSegmentId(), msg.getReferences(), out);
+    }
+
+    private void encode(String segmentId, Iterable<String> references, ByteBuf out) {
+        byte[] data = serialize(segmentId, references).getBytes(Charsets.UTF_8);
+        out.writeInt(data.length + 1);
+        out.writeByte(Messages.HEADER_REFERENCES);
+        out.writeBytes(data);
+    }
+
+    private String serialize(String segmentId, Iterable<String> references) {
+        return segmentId + ":" + Joiner.on(",").join(references);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java Wed Oct  5 19:34:00 2016
@@ -27,12 +27,16 @@ class Messages {
 
     static final byte HEADER_BLOB = 0x02;
 
+    static final byte HEADER_REFERENCES = 0x03;
+
     static final String GET_HEAD = "h";
 
     static final String GET_SEGMENT = "s.";
 
     static final String GET_BLOB = "b.";
 
+    static final String GET_REFERENCES = "r.";
+
     private static final String MAGIC = "Standby-CMD@";
 
     private static final String SEPARATOR = ":";
@@ -70,6 +74,14 @@ class Messages {
         return newGetSegmentRequest(clientId, segmentId, true);
     }
 
+    static String newGetReferencesRequest(String clientId, String segmentId, boolean delimited) {
+        return newRequest(clientId, GET_REFERENCES + segmentId, delimited);
+    }
+
+    static String newGetReferencesRequest(String clientId, String segmentId) {
+        return newGetReferencesRequest(clientId, segmentId, true);
+    }
+
     static String newGetBlobRequest(String clientId, String blobId, boolean delimited) {
         return newRequest(clientId, GET_BLOB + blobId, delimited);
     }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java Wed Oct  5 19:34:00 2016
@@ -44,6 +44,9 @@ public class RequestDecoder extends Mess
         } else if (request.startsWith(Messages.GET_SEGMENT)) {
             log.debug("Parsed 'get segment' message");
             out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_SEGMENT.length())));
+        } else if (request.startsWith(Messages.GET_REFERENCES)) {
+            log.debug("Parsed 'get references' message");
+            out.add(new GetReferencesRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_REFERENCES.length())));
         } else {
             log.debug("Received unrecognizable message {}, dropping", msg);
         }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java Wed Oct  5 19:34:00 2016
@@ -17,6 +17,9 @@
 
 package org.apache.jackrabbit.oak.segment.standby.codec;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+
 import java.util.List;
 import java.util.UUID;
 
@@ -49,6 +52,10 @@ public class ResponseDecoder extends Byt
                 log.debug("Decoding 'get blob' response");
                 decodeGetBlobResponse(length, in, out);
                 break;
+            case Messages.HEADER_REFERENCES:
+                log.debug("Decoding 'get references' response");
+                decodeGetReferencesResponse(length, in, out);
+                break;
             default:
                 log.debug("Invalid type, dropping message");
         }
@@ -101,6 +108,33 @@ public class ResponseDecoder extends Byt
         out.add(new GetBlobResponse(null, blobId, blobData));
     }
 
+    private void decodeGetReferencesResponse(int length, ByteBuf in, List<Object> out) {
+        byte[] data = new byte[length - 1];
+
+        in.readBytes(data);
+
+        String body = new String(data, Charsets.UTF_8);
+
+        int colon = body.indexOf(":");
+
+        if (colon < 0) {
+            return;
+        }
+
+        String segmentId = body.substring(0, colon);
+        String referencesList = body.substring(colon + 1);
+
+        List<String> references;
+
+        if (referencesList.isEmpty()) {
+            references = emptyList();
+        } else {
+            references = asList(referencesList.split(","));
+        }
+
+        out.add(new GetReferencesResponse(null, segmentId, references));
+    }
+
     private long hash(byte[] data) {
         return Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong();
     }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DefaultStandbyReferencesReader implements StandbyReferencesReader {
+
+    private static final Logger log = LoggerFactory.getLogger(DefaultStandbyReferencesReader.class);
+
+    private final FileStore store;
+
+    public DefaultStandbyReferencesReader(FileStore store) {
+        this.store = store;
+    }
+
+    @Override
+    public Iterable<String> readReferences(String id) {
+        UUID uuid = UUID.fromString(id);
+
+        long msb = uuid.getMostSignificantBits();
+        long lsb = uuid.getLeastSignificantBits();
+
+        Segment segment = readSegment(store.newSegmentId(msb, lsb));
+
+        if (segment == null) {
+            return null;
+        }
+
+        List<String> references = newArrayList();
+
+        for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
+            references.add(segment.getReferencedSegmentId(i).toString());
+        }
+
+        return references;
+    }
+
+    private Segment readSegment(SegmentId id) {
+        try {
+            return store.readSegment(id);
+        } catch (SegmentNotFoundException e) {
+            log.warn(String.format("Unable to read segment %s", id), e);
+        }
+
+        return null;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesRequest;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetReferencesRequestHandler extends SimpleChannelInboundHandler<GetReferencesRequest> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetReferencesRequestHandler.class);
+
+    private final StandbyReferencesReader reader;
+
+    public GetReferencesRequestHandler(StandbyReferencesReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, GetReferencesRequest msg) throws Exception {
+        log.debug("Reading references of segment {} for client {}", msg.getSegmentId(), msg.getClientId());
+
+        Iterable<String> references = reader.readReferences(msg.getSegmentId());
+
+        if (references == null) {
+            log.debug("References for segment {} not found, discarding request from client {}", msg.getSegmentId(), msg.getClientId());
+            return;
+        }
+
+        ctx.writeAndFlush(new GetReferencesResponse(msg.getClientId(), msg.getSegmentId(), references));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+interface StandbyReferencesReader {
+
+    Iterable<String> readReferences(String segmentId);
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java Wed Oct  5 19:34:00 2016
@@ -40,8 +40,10 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.netty.util.CharsetUtil;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponseEncoder;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponseEncoder;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponseEncoder;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponseEncoder;
 import org.apache.jackrabbit.oak.segment.standby.codec.RequestDecoder;
 import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
@@ -162,13 +164,17 @@ class StandbyServer implements AutoClose
                 p.addLast(new GetHeadResponseEncoder());
                 p.addLast(new GetSegmentResponseEncoder());
                 p.addLast(new GetBlobResponseEncoder());
+                p.addLast(new GetReferencesResponseEncoder());
                 p.addLast(new ResponseObserverHandler(builder.observer));
 
                 // Handlers
 
-                p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(builder.storeProvider.provideStore())));
-                p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(builder.storeProvider.provideStore())));
-                p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(builder.storeProvider.provideStore())));
+                FileStore store = builder.storeProvider.provideStore();
+
+                p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store)));
+                p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store)));
+                p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store)));
+                p.addLast(new GetReferencesRequestHandler(new DefaultStandbyReferencesReader(store)));
             }
         });
     }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java Wed Oct  5 19:34:00 2016
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Test;
+
+public class GetReferencesResponseEncoderTest {
+
+    @Test
+    public void encodeResponse() throws Exception {
+        EmbeddedChannel channel = new EmbeddedChannel(new GetReferencesResponseEncoder());
+        channel.writeOutbound(new GetReferencesResponse("clientId", "a", asList("b", "c")));
+        ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+        String body = "a:b,c";
+        byte[] data = body.getBytes(Charsets.UTF_8);
+
+        ByteBuf expected = Unpooled.buffer();
+        expected.writeInt(data.length + 1);
+        expected.writeByte(Messages.HEADER_REFERENCES);
+        expected.writeBytes(data);
+
+        assertEquals(expected, buffer);
+
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java Wed Oct  5 19:34:00 2016
@@ -52,6 +52,15 @@ public class RequestDecoderTest {
     }
 
     @Test
+    public void shouldDecodeValidGetReferencesRequests() throws Exception {
+        EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder());
+        channel.writeInbound(Messages.newGetReferencesRequest("clientId", "segmentId", false));
+        GetReferencesRequest request = (GetReferencesRequest) channel.readInbound();
+        assertEquals("clientId", request.getClientId());
+        assertEquals("segmentId", request.getSegmentId());
+    }
+
+    @Test
     public void shouldDropInvalidMessages() throws Exception {
         EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder());
         channel.writeInbound("Standby-CMD@clientId:z");

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java Wed Oct  5 19:34:00 2016
@@ -17,10 +17,15 @@
 
 package org.apache.jackrabbit.oak.segment.standby.codec;
 
+import static com.google.common.collect.Iterables.elementsEqual;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.hash;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.UUID;
 
@@ -122,6 +127,54 @@ public class ResponseDecoderTest {
     }
 
     @Test
+    public void shouldDecodeValidGetReferencesResponses() throws Exception {
+        byte[] data = "a:b,c".getBytes(Charsets.UTF_8);
+
+        ByteBuf buf = Unpooled.buffer();
+        buf.writeInt(data.length + 1);
+        buf.writeByte(Messages.HEADER_REFERENCES);
+        buf.writeBytes(data);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
+        channel.writeInbound(buf);
+        GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
+        assertEquals("a", response.getSegmentId());
+        assertTrue(elementsEqual(asList("b", "c"), response.getReferences()));
+    }
+
+    @Test
+    public void shouldDecodeValidSingleElementGetReferencesResponses() throws Exception {
+        byte[] data = "a:b".getBytes(Charsets.UTF_8);
+
+        ByteBuf buf = Unpooled.buffer();
+        buf.writeInt(data.length + 1);
+        buf.writeByte(Messages.HEADER_REFERENCES);
+        buf.writeBytes(data);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
+        channel.writeInbound(buf);
+        GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
+        assertEquals("a", response.getSegmentId());
+        assertTrue(elementsEqual(newArrayList("b"), response.getReferences()));
+    }
+
+    @Test
+    public void shouldDecodeValidZeroElementsGetReferencesResponses() throws Exception {
+        byte[] data = "a:".getBytes(Charsets.UTF_8);
+
+        ByteBuf buf = Unpooled.buffer();
+        buf.writeInt(data.length + 1);
+        buf.writeByte(Messages.HEADER_REFERENCES);
+        buf.writeBytes(data);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
+        channel.writeInbound(buf);
+        GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
+        assertEquals("a", response.getSegmentId());
+        assertTrue(elementsEqual(emptyList(), response.getReferences()));
+    }
+
+    @Test
     public void shouldDropInvalidGetSegmentResponses() throws Exception {
         UUID uuid = new UUID(1, 2);
         byte[] data = new byte[] {3, 4, 5};