You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/10/05 19:34:00 UTC
svn commit: r1763480 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/standby/client/
main/java/org/apache/jackrabbit/oak/segment/standby/codec/
main/java/org/apache/jackrabbit/oak/segment/standby/server/ test...
Author: frm
Date: Wed Oct 5 19:34:00 2016
New Revision: 1763480
URL: http://svn.apache.org/viewvc?rev=1763480&view=rev
Log:
OAK-4897 - Use a new RPC to figure out the storage order of segments
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.client;
+
+import java.util.Queue;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponse;
+
+class GetReferencesResponseHandler extends SimpleChannelInboundHandler<GetReferencesResponse> {
+
+ private final Queue<GetReferencesResponse> queue;
+
+ GetReferencesResponseHandler(Queue<GetReferencesResponse> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, GetReferencesResponse msg) throws Exception {
+ queue.offer(msg);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/GetReferencesResponseHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java Wed Oct 5 19:34:00 2016
@@ -43,6 +43,9 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequest;
import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadRequestEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponse;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesRequest;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesRequestEncoder;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponse;
import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequest;
import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentRequestEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponse;
@@ -60,6 +63,8 @@ class StandbyClient implements AutoClose
private final BlockingQueue<GetBlobResponse> blobQueue = new LinkedBlockingDeque<>();
+ private final BlockingQueue<GetReferencesResponse> referencesQueue = new LinkedBlockingDeque<>();
+
private final boolean secure;
private final int readTimeoutMs;
@@ -124,12 +129,14 @@ class StandbyClient implements AutoClose
p.addLast(new GetHeadRequestEncoder());
p.addLast(new GetSegmentRequestEncoder());
p.addLast(new GetBlobRequestEncoder());
+ p.addLast(new GetReferencesRequestEncoder());
// Handlers
p.addLast(new GetHeadResponseHandler(headQueue));
p.addLast(new GetSegmentResponseHandler(segmentQueue));
p.addLast(new GetBlobResponseHandler(blobQueue));
+ p.addLast(new GetReferencesResponseHandler(referencesQueue));
}
});
@@ -192,4 +199,16 @@ class StandbyClient implements AutoClose
return response.getBlobData();
}
+ Iterable<String> getReferences(String segmentId) throws InterruptedException {
+ channel.writeAndFlush(new GetReferencesRequest(clientId, segmentId));
+
+ GetReferencesResponse response = referencesQueue.poll(readTimeoutMs, TimeUnit.MILLISECONDS);
+
+ if (response == null) {
+ return null;
+ }
+
+ return response.getReferences();
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java Wed Oct 5 19:34:00 2016
@@ -22,8 +22,10 @@ package org.apache.jackrabbit.oak.segmen
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.nio.file.Files;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -143,12 +145,9 @@ public final class StandbyClientSync imp
new StandbyClientSyncExecution(fileStore, client, newRunningSupplier()).execute();
long sizeAfter = fileStore.getStats().getApproximateSize();
- if (autoClean && sizeBefore > 0) {
- // if size gain is over 25% call cleanup
- if (sizeAfter - sizeBefore > 0.25 * sizeBefore) {
- log.info("Store size increased from {} to {}, will run cleanup.", humanReadableByteCount(sizeBefore), humanReadableByteCount(sizeAfter));
- fileStore.cleanup();
- }
+ if (autoClean && sizeAfter > 1.25 * sizeBefore) {
+ log.info("Store size increased from {} to {}, will run cleanup.", humanReadableByteCount(sizeBefore), humanReadableByteCount(sizeAfter));
+ cleanupAndRemove();
}
}
this.failedRequests = 0;
@@ -165,6 +164,18 @@ public final class StandbyClientSync imp
}
}
+ private void cleanupAndRemove() throws IOException {
+ for (File file : fileStore.cleanup()) {
+ log.info("Removing file {}", file);
+
+ try {
+ Files.deleteIfExists(file.toPath());
+ } catch (IOException e) {
+ log.warn(String.format("Unable to remove file %s", file), e);
+ }
+ }
+ }
+
private Supplier<Boolean> newRunningSupplier() {
return new Supplier<Boolean>() {
@@ -229,7 +240,7 @@ public final class StandbyClientSync imp
@Override
public void cleanup() {
try {
- fileStore.cleanup();
+ cleanupAndRemove();
} catch (IOException e) {
log.error("Error while cleaning up", e);
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java Wed Oct 5 19:34:00 2016
@@ -18,15 +18,13 @@
package org.apache.jackrabbit.oak.segment.standby.client;
import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Sets.newHashSet;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import javax.annotation.Nonnull;
-
import com.google.common.base.Supplier;
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.Segment;
@@ -54,10 +52,6 @@ class StandbyClientSyncExecution {
private final Supplier<Boolean> running;
- private final Set<UUID> queued = newHashSet();
-
- private final Set<UUID> local = newHashSet();
-
private final Map<UUID, Segment> cache = newHashMap();
StandbyClientSyncExecution(FileStore store, StandbyClient client, Supplier<Boolean> running) {
@@ -78,7 +72,8 @@ class StandbyClientSyncExecution {
SegmentNodeBuilder builder = before.builder();
SegmentNodeState current = newSegmentNodeState(remoteHead);
compareAgainstBaseState(current, before, builder);
- boolean ok = setHead(before, builder.getNodeState());
+ boolean ok = store.getRevisions().setHead(before.getRecordId(), remoteHead);
+ store.flush();
log.debug("updated head state successfully: {} in {}ms.", ok, System.currentTimeMillis() - t);
}
@@ -90,10 +85,6 @@ class StandbyClientSyncExecution {
return store.getReader().readNode(id);
}
- private boolean setHead(@Nonnull SegmentNodeState expected, @Nonnull SegmentNodeState head) {
- return store.getRevisions().setHead(expected.getRecordId(), head.getRecordId());
- }
-
private boolean compareAgainstBaseState(SegmentNodeState current, SegmentNodeState before, SegmentNodeBuilder builder) throws Exception {
while (true) {
try {
@@ -110,22 +101,54 @@ class StandbyClientSyncExecution {
batch.offer(segmentId);
+ LinkedList<UUID> bulk = new LinkedList<>();
+ LinkedList<UUID> data = new LinkedList<>();
+
+ Set<UUID> visited = new HashSet<>();
+ Set<UUID> queued = new HashSet<>();
+ Set<UUID> local = new HashSet<>();
+
while (batch.size() > 0) {
UUID current = batch.remove();
- log.debug("Loading segment {}", current);
- Segment segment = copySegmentFromPrimary(current);
-
- log.debug("Marking segment {} as loaded", current);
- local.add(current);
+ log.debug("Inspecting segment {}", current);
+ visited.add(current);
- if (!SegmentId.isDataSegmentId(current.getLeastSignificantBits())) {
+ // Add the current segment ID at the beginning of the respective
+ // list, depending on its type. This allows to process those
+ // segments in an optimal topological order later on. If the current
+ // segment is a bulk segment, we can skip the rest of the loop,
+ // since bulk segments don't reference any other segment.
+
+ if (SegmentId.isDataSegmentId(current.getLeastSignificantBits())) {
+ data.addFirst(current);
+ } else {
+ bulk.addFirst(current);
continue;
}
- log.debug("Inspecting segment {} for references", current);
- for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
- UUID referenced = segment.getReferencedSegmentId(i);
+ for (String s : readReferences(current)) {
+ UUID referenced = UUID.fromString(s);
+
+ // Short circuit for the "backward reference". The segment graph
+ // is not guaranteed to be acyclic, so there might be segments
+ // pointing back to a previously visited (but locally
+ // unavailable) segment.
+
+ if (visited.contains(referenced)) {
+ continue;
+ }
+
+ // Short circuit for the "diamond problem". Imagine that segment
+ // S1 references S2 and S3 and both S2 and S3 reference S4.
+ // These references form the shape of a diamond. If the segments
+ // are processed in the order S1, S2, S3, then S4 is added twice
+ // to the 'batch' queue. The following check prevents processing
+ // S4 twice or more.
+
+ if (queued.contains(referenced)) {
+ continue;
+ }
// Short circuit for the "sharing-is-caring problem". If many
// new segments are sharing segments that are already locally
@@ -140,20 +163,6 @@ class StandbyClientSyncExecution {
if (isLocal(referenced)) {
local.add(referenced);
- }
-
- if (local.contains(referenced)) {
- continue;
- }
-
- // Short circuit for the "diamond problem". Imagine that segment S1
- // references S2 and S3 and both S2 and S3 reference S4. These
- // references form the shape of a diamond. If the segments are
- // processed in the order S1, S2, S3, then S4 is added twice to the
- // 'batch' queue. The following check prevents processing S4 twice
- // or more.
-
- if (queued.contains(referenced)) {
continue;
}
@@ -163,16 +172,31 @@ class StandbyClientSyncExecution {
// queue and transfer the segment later.
log.debug("Found reference from {} to {}", current, referenced);
-
- if (SegmentId.isDataSegmentId(referenced.getLeastSignificantBits())) {
- batch.add(referenced);
- } else {
- batch.addFirst(referenced);
- }
-
+ batch.add(referenced);
queued.add(referenced);
}
}
+
+ for (UUID id : bulk) {
+ log.info("Copying bulk segment {} from primary", id);
+ copySegmentFromPrimary(id);
+ }
+
+ for (UUID id : data) {
+ log.info("Copying data segment {} from primary", id);
+ copySegmentFromPrimary(id);
+ }
+
+ }
+
+ private Iterable<String> readReferences(UUID id) throws InterruptedException {
+ Iterable<String> references = client.getReferences(id.toString());
+
+ if (references == null) {
+ throw new IllegalStateException(String.format("Unable to read references of segment %s from primary", id));
+ }
+
+ return references;
}
private boolean isLocal(UUID id) {
@@ -192,12 +216,12 @@ class StandbyClientSyncExecution {
return persisted;
}
- private Segment copySegmentFromPrimary(UUID uuid) throws Exception {
+ private void copySegmentFromPrimary(UUID uuid) throws Exception {
Segment result = cache.get(uuid);
if (result != null) {
log.debug("Segment {} was found in the local cache", uuid);
- return result;
+ return;
}
byte[] data = client.getSegment(uuid.toString());
@@ -212,7 +236,6 @@ class StandbyClientSyncExecution {
store.writeSegment(segmentId, data, 0, data.length);
result = segmentId.getSegment();
cache.put(uuid, result);
- return result;
}
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+public class GetReferencesRequest {
+
+ private final String clientId;
+
+ private final String segmentId;
+
+ public GetReferencesRequest(String clientId, String segmentId) {
+ this.clientId = clientId;
+ this.segmentId = segmentId;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+import java.util.List;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetReferencesRequestEncoder extends MessageToMessageEncoder<GetReferencesRequest> {
+
+ private final Logger log = LoggerFactory.getLogger(GetReferencesRequestEncoder.class);
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, GetReferencesRequest msg, List<Object> out) throws Exception {
+ log.debug("Sending request from client {} for references of segment {}", msg.getClientId(), msg.getSegmentId());
+ out.add(Messages.newGetReferencesRequest(msg.getClientId(), msg.getSegmentId()));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesRequestEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+public class GetReferencesResponse {
+
+ private final String clientId;
+
+ private final String segmentId;
+
+ private final Iterable<String> references;
+
+ public GetReferencesResponse(String clientId, String segmentId, Iterable<String> references) {
+ this.clientId = clientId;
+ this.segmentId = segmentId;
+ this.references = references;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public Iterable<String> getReferences() {
+ return references;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponse.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GetReferencesResponseEncoder extends MessageToByteEncoder<GetReferencesResponse> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetReferencesResponseEncoder.class);
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, GetReferencesResponse msg, ByteBuf out) throws Exception {
+ log.debug("Sending references of segment {} to client {}", msg.getSegmentId(), msg.getClientId());
+ encode(msg.getSegmentId(), msg.getReferences(), out);
+ }
+
+ private void encode(String segmentId, Iterable<String> references, ByteBuf out) {
+ byte[] data = serialize(segmentId, references).getBytes(Charsets.UTF_8);
+ out.writeInt(data.length + 1);
+ out.writeByte(Messages.HEADER_REFERENCES);
+ out.writeBytes(data);
+ }
+
+ private String serialize(String segmentId, Iterable<String> references) {
+ return segmentId + ":" + Joiner.on(",").join(references);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java Wed Oct 5 19:34:00 2016
@@ -27,12 +27,16 @@ class Messages {
static final byte HEADER_BLOB = 0x02;
+ static final byte HEADER_REFERENCES = 0x03;
+
static final String GET_HEAD = "h";
static final String GET_SEGMENT = "s.";
static final String GET_BLOB = "b.";
+ static final String GET_REFERENCES = "r.";
+
private static final String MAGIC = "Standby-CMD@";
private static final String SEPARATOR = ":";
@@ -70,6 +74,14 @@ class Messages {
return newGetSegmentRequest(clientId, segmentId, true);
}
+ static String newGetReferencesRequest(String clientId, String segmentId, boolean delimited) {
+ return newRequest(clientId, GET_REFERENCES + segmentId, delimited);
+ }
+
+ static String newGetReferencesRequest(String clientId, String segmentId) {
+ return newGetReferencesRequest(clientId, segmentId, true);
+ }
+
static String newGetBlobRequest(String clientId, String blobId, boolean delimited) {
return newRequest(clientId, GET_BLOB + blobId, delimited);
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java Wed Oct 5 19:34:00 2016
@@ -44,6 +44,9 @@ public class RequestDecoder extends Mess
} else if (request.startsWith(Messages.GET_SEGMENT)) {
log.debug("Parsed 'get segment' message");
out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_SEGMENT.length())));
+ } else if (request.startsWith(Messages.GET_REFERENCES)) {
+ log.debug("Parsed 'get references' message");
+ out.add(new GetReferencesRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_REFERENCES.length())));
} else {
log.debug("Received unrecognizable message {}, dropping", msg);
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java Wed Oct 5 19:34:00 2016
@@ -17,6 +17,9 @@
package org.apache.jackrabbit.oak.segment.standby.codec;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+
import java.util.List;
import java.util.UUID;
@@ -49,6 +52,10 @@ public class ResponseDecoder extends Byt
log.debug("Decoding 'get blob' response");
decodeGetBlobResponse(length, in, out);
break;
+ case Messages.HEADER_REFERENCES:
+ log.debug("Decoding 'get references' response");
+ decodeGetReferencesResponse(length, in, out);
+ break;
default:
log.debug("Invalid type, dropping message");
}
@@ -101,6 +108,33 @@ public class ResponseDecoder extends Byt
out.add(new GetBlobResponse(null, blobId, blobData));
}
+ private void decodeGetReferencesResponse(int length, ByteBuf in, List<Object> out) {
+ byte[] data = new byte[length - 1];
+
+ in.readBytes(data);
+
+ String body = new String(data, Charsets.UTF_8);
+
+ int colon = body.indexOf(":");
+
+ if (colon < 0) {
+ return;
+ }
+
+ String segmentId = body.substring(0, colon);
+ String referencesList = body.substring(colon + 1);
+
+ List<String> references;
+
+ if (referencesList.isEmpty()) {
+ references = emptyList();
+ } else {
+ references = asList(referencesList.split(","));
+ }
+
+ out.add(new GetReferencesResponse(null, segmentId, references));
+ }
+
private long hash(byte[] data) {
return Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong();
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DefaultStandbyReferencesReader implements StandbyReferencesReader {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultStandbyReferencesReader.class);
+
+ private final FileStore store;
+
+ public DefaultStandbyReferencesReader(FileStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public Iterable<String> readReferences(String id) {
+ UUID uuid = UUID.fromString(id);
+
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+
+ Segment segment = readSegment(store.newSegmentId(msb, lsb));
+
+ if (segment == null) {
+ return null;
+ }
+
+ List<String> references = newArrayList();
+
+ for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
+ references.add(segment.getReferencedSegmentId(i).toString());
+ }
+
+ return references;
+ }
+
+ private Segment readSegment(SegmentId id) {
+ try {
+ return store.readSegment(id);
+ } catch (SegmentNotFoundException e) {
+ log.warn(String.format("Unable to read segment %s", id), e);
+ }
+
+ return null;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesRequest;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetReferencesRequestHandler extends SimpleChannelInboundHandler<GetReferencesRequest> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetReferencesRequestHandler.class);
+
+ private final StandbyReferencesReader reader;
+
+ public GetReferencesRequestHandler(StandbyReferencesReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, GetReferencesRequest msg) throws Exception {
+ log.debug("Reading references of segment {} for client {}", msg.getSegmentId(), msg.getClientId());
+
+ Iterable<String> references = reader.readReferences(msg.getSegmentId());
+
+ if (references == null) {
+ log.debug("References for segment {} not found, discarding request from client {}", msg.getSegmentId(), msg.getClientId());
+ return;
+ }
+
+ ctx.writeAndFlush(new GetReferencesResponse(msg.getClientId(), msg.getSegmentId(), references));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetReferencesRequestHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+interface StandbyReferencesReader {
+
+ Iterable<String> readReferences(String segmentId);
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyReferencesReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java Wed Oct 5 19:34:00 2016
@@ -40,8 +40,10 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponseEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.GetHeadResponseEncoder;
+import org.apache.jackrabbit.oak.segment.standby.codec.GetReferencesResponseEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.GetSegmentResponseEncoder;
import org.apache.jackrabbit.oak.segment.standby.codec.RequestDecoder;
import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
@@ -162,13 +164,17 @@ class StandbyServer implements AutoClose
p.addLast(new GetHeadResponseEncoder());
p.addLast(new GetSegmentResponseEncoder());
p.addLast(new GetBlobResponseEncoder());
+ p.addLast(new GetReferencesResponseEncoder());
p.addLast(new ResponseObserverHandler(builder.observer));
// Handlers
- p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(builder.storeProvider.provideStore())));
- p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(builder.storeProvider.provideStore())));
- p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(builder.storeProvider.provideStore())));
+ FileStore store = builder.storeProvider.provideStore();
+
+ p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store)));
+ p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store)));
+ p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store)));
+ p.addLast(new GetReferencesRequestHandler(new DefaultStandbyReferencesReader(store)));
}
});
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java?rev=1763480&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java Wed Oct 5 19:34:00 2016
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.codec;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Test;
+
+public class GetReferencesResponseEncoderTest {
+
+ @Test
+ public void encodeResponse() throws Exception {
+ EmbeddedChannel channel = new EmbeddedChannel(new GetReferencesResponseEncoder());
+ channel.writeOutbound(new GetReferencesResponse("clientId", "a", asList("b", "c")));
+ ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+ String body = "a:b,c";
+ byte[] data = body.getBytes(Charsets.UTF_8);
+
+ ByteBuf expected = Unpooled.buffer();
+ expected.writeInt(data.length + 1);
+ expected.writeByte(Messages.HEADER_REFERENCES);
+ expected.writeBytes(data);
+
+ assertEquals(expected, buffer);
+
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetReferencesResponseEncoderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java Wed Oct 5 19:34:00 2016
@@ -52,6 +52,15 @@ public class RequestDecoderTest {
}
@Test
+ public void shouldDecodeValidGetReferencesRequests() throws Exception {
+ EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder());
+ channel.writeInbound(Messages.newGetReferencesRequest("clientId", "segmentId", false));
+ GetReferencesRequest request = (GetReferencesRequest) channel.readInbound();
+ assertEquals("clientId", request.getClientId());
+ assertEquals("segmentId", request.getSegmentId());
+ }
+
+ @Test
public void shouldDropInvalidMessages() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder());
channel.writeInbound("Standby-CMD@clientId:z");
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java?rev=1763480&r1=1763479&r2=1763480&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java Wed Oct 5 19:34:00 2016
@@ -17,10 +17,15 @@
package org.apache.jackrabbit.oak.segment.standby.codec;
+import static com.google.common.collect.Iterables.elementsEqual;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
import static org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.hash;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.util.UUID;
@@ -122,6 +127,54 @@ public class ResponseDecoderTest {
}
@Test
+ public void shouldDecodeValidGetReferencesResponses() throws Exception {
+ byte[] data = "a:b,c".getBytes(Charsets.UTF_8);
+
+ ByteBuf buf = Unpooled.buffer();
+ buf.writeInt(data.length + 1);
+ buf.writeByte(Messages.HEADER_REFERENCES);
+ buf.writeBytes(data);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
+ channel.writeInbound(buf);
+ GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
+ assertEquals("a", response.getSegmentId());
+ assertTrue(elementsEqual(asList("b", "c"), response.getReferences()));
+ }
+
+ @Test
+ public void shouldDecodeValidSingleElementGetReferencesResponses() throws Exception {
+ byte[] data = "a:b".getBytes(Charsets.UTF_8);
+
+ ByteBuf buf = Unpooled.buffer();
+ buf.writeInt(data.length + 1);
+ buf.writeByte(Messages.HEADER_REFERENCES);
+ buf.writeBytes(data);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
+ channel.writeInbound(buf);
+ GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
+ assertEquals("a", response.getSegmentId());
+ assertTrue(elementsEqual(newArrayList("b"), response.getReferences()));
+ }
+
+ @Test
+ public void shouldDecodeValidZeroElementsGetReferencesResponses() throws Exception {
+ byte[] data = "a:".getBytes(Charsets.UTF_8);
+
+ ByteBuf buf = Unpooled.buffer();
+ buf.writeInt(data.length + 1);
+ buf.writeByte(Messages.HEADER_REFERENCES);
+ buf.writeBytes(data);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
+ channel.writeInbound(buf);
+ GetReferencesResponse response = (GetReferencesResponse) channel.readInbound();
+ assertEquals("a", response.getSegmentId());
+ assertTrue(elementsEqual(emptyList(), response.getReferences()));
+ }
+
+ @Test
public void shouldDropInvalidGetSegmentResponses() throws Exception {
UUID uuid = new UUID(1, 2);
byte[] data = new byte[] {3, 4, 5};