You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/09/12 16:47:28 UTC

svn commit: r1760405 [1/2] - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/standby/server/ test/java/org/apache/jackrabbit/oak/segment/standby/server/

Author: frm
Date: Mon Sep 12 16:47:28 2016
New Revision: 1760405

URL: http://svn.apache.org/viewvc?rev=1760405&view=rev
Log:
OAK-4785 - Split StandbyServerHandler into smaller, more cohesive handlers

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/ServerTestUtils.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+class DefaultStandbyBlobReader implements StandbyBlobReader {
+
+    private final FileStore store;
+
+    DefaultStandbyBlobReader(FileStore store) {
+        this.store = store;
+    }
+
+    @Override
+    public Blob readBlob(String blobId) {
+        BlobStore blobStore = store.getBlobStore();
+
+        if (blobStore != null) {
+            return new BlobStoreBlob(blobStore, blobId);
+        }
+
+        return null;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+
+class DefaultStandbyHeadReader implements StandbyHeadReader {
+
+    private final FileStore store;
+
+    DefaultStandbyHeadReader(FileStore store) {
+        this.store = store;
+    }
+
+    @Override
+    public RecordId readHeadRecordId() {
+        return store.getHead().getRecordId();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DefaultStandbySegmentReader implements StandbySegmentReader {
+
+    private static final Logger log = LoggerFactory.getLogger(DefaultStandbySegmentReader.class);
+
+    private final FileStore store;
+
+    DefaultStandbySegmentReader(FileStore store) {
+        this.store = store;
+    }
+
+    @Override
+    public Segment readSegment(UUID uuid) {
+        long msb = uuid.getMostSignificantBits();
+        long lsb = uuid.getLeastSignificantBits();
+
+        SegmentId id = store.newSegmentId(msb, lsb);
+
+        for (int i = 0; i < 10; i++) {
+            try {
+                return store.readSegment(id);
+            } catch (SegmentNotFoundException e) {
+                log.warn("Unable to read segment, waiting...", e);
+            }
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(2000);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return null;
+            }
+        }
+
+        return null;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetBlobRequestHandler extends SimpleChannelInboundHandler<GetBlobRequest> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetBlobRequestHandler.class);
+
+    private final StandbyBlobReader reader;
+
+    GetBlobRequestHandler(StandbyBlobReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, GetBlobRequest msg) throws Exception {
+        log.debug("Reading blob {} for client {}", msg.getBlobId(), msg.getClientId());
+
+        Blob blob = reader.readBlob(msg.getBlobId());
+
+        if (blob == null) {
+            log.debug("Blob {} not found, discarding request from client {}", msg.getBlobId(), msg.getClientId());
+            return;
+        }
+
+        ctx.writeAndFlush(new GetBlobResponse(msg.getClientId(), blob));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.api.Blob;
+
+class GetBlobResponse {
+
+    private final String clientId;
+
+    private final Blob blob;
+
+    GetBlobResponse(String clientId, Blob blob) {
+        this.clientId = clientId;
+        this.blob = blob;
+    }
+
+    String getClientId() {
+        return clientId;
+    }
+
+    Blob getBlob() {
+        return blob;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetBlobResponseEncoder extends MessageToByteEncoder<GetBlobResponse> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetBlobResponseEncoder.class);
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, GetBlobResponse msg, ByteBuf out) throws Exception {
+        log.debug("Sending blob {} to client {}", msg.getBlob().getContentIdentity(), msg.getClientId());
+        encode(msg.getBlob(), out);
+    }
+
+    private void encode(Blob b, ByteBuf out) throws Exception {
+        byte[] bytes;
+
+        try (InputStream s = b.getNewStream()) {
+            bytes = IOUtils.toByteArray(s);
+        }
+
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long hash = hasher.putBytes(bytes).hash().padToLong();
+
+        out.writeInt(bytes.length);
+        out.writeByte(Messages.HEADER_BLOB);
+
+        String bid = b.getContentIdentity();
+        byte[] id = bid.getBytes(Charset.forName("UTF-8"));
+        out.writeInt(id.length);
+        out.writeBytes(id);
+
+        out.writeLong(hash);
+        out.writeBytes(bytes);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles 'get head' requests and produces 'get head' responses. A response is
+ * generated only iff the record ID of the head root state can be read.
+ */
+class GetHeadRequestHandler extends SimpleChannelInboundHandler<GetHeadRequest> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetHeadRequest.class);
+
+    private final StandbyHeadReader reader;
+
+    GetHeadRequestHandler(StandbyHeadReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, GetHeadRequest msg) throws Exception {
+        log.debug("Reading head for client {}", msg.getClientId());
+
+        RecordId id = reader.readHeadRecordId();
+
+        if (id == null) {
+            log.debug("Head not found, discarding request from client {}", msg.getClientId());
+            return;
+        }
+
+        ctx.writeAndFlush(new GetHeadResponse(msg.getClientId(), id));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
+
+class GetHeadResponse {
+
+    private final String clientId;
+
+    private final RecordId headRecordId;
+
+    GetHeadResponse(String clientId, RecordId headRecordId) {
+        this.clientId = clientId;
+        this.headRecordId = headRecordId;
+    }
+
+    String getClientId() {
+        return clientId;
+    }
+
+    RecordId getHeadRecordId() {
+        return headRecordId;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.CharsetUtil;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encodes a 'get head' response.
+ */
+class GetHeadResponseEncoder extends MessageToByteEncoder<GetHeadResponse> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetHeadResponseEncoder.class);
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, GetHeadResponse msg, ByteBuf out) throws Exception {
+        log.debug("Sending head {} to client {}", msg.getHeadRecordId(), msg.getClientId());
+        byte[] body = msg.getHeadRecordId().toString().getBytes(CharsetUtil.UTF_8);
+        out.writeInt(body.length + 1);
+        out.writeByte(Messages.HEADER_RECORD);
+        out.writeBytes(body);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java Mon Sep 12 16:47:28 2016
@@ -17,13 +17,15 @@
 
 package org.apache.jackrabbit.oak.segment.standby.server;
 
+import java.util.UUID;
+
 class GetSegmentRequest {
 
     private final String clientId;
 
-    private final String segmentId;
+    private final UUID segmentId;
 
-    GetSegmentRequest(String clientId, String segmentId) {
+    GetSegmentRequest(String clientId, UUID segmentId) {
         this.clientId = clientId;
         this.segmentId = segmentId;
     }
@@ -32,7 +34,7 @@ class GetSegmentRequest {
         return clientId;
     }
 
-    public String getSegmentId() {
+    public UUID getSegmentId() {
         return segmentId;
     }
 

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetSegmentRequestHandler extends SimpleChannelInboundHandler<GetSegmentRequest> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetSegmentRequestHandler.class);
+
+    private final StandbySegmentReader reader;
+
+    GetSegmentRequestHandler(StandbySegmentReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, GetSegmentRequest msg) throws Exception {
+        log.debug("Reading segment {} for client {}", msg.getSegmentId(), msg.getClientId());
+
+        Segment segment = reader.readSegment(msg.getSegmentId());
+
+        if (segment == null) {
+            log.debug("Segment {} not found, discarding request from client {}", msg.getSegmentId(), msg.getClientId());
+            return;
+        }
+
+        ctx.writeAndFlush(new GetSegmentResponse(msg.getClientId(), segment));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+
+class GetSegmentResponse {
+
+    private final String clientId;
+
+    private final Segment segment;
+
+    GetSegmentResponse(String clientId, Segment segment) {
+        this.clientId = clientId;
+        this.segment = segment;
+    }
+
+    String getClientId() {
+        return clientId;
+    }
+
+    Segment getSegment() {
+        return segment;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.io.ByteArrayOutputStream;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encodes a 'get segment' response.
+ */
+class GetSegmentResponseEncoder extends MessageToByteEncoder<GetSegmentResponse> {
+
+    private static final Logger log = LoggerFactory.getLogger(GetSegmentResponseEncoder.class);
+
+    private static final int EXTRA_HEADERS_LEN = 29;
+
+    private static final int EXTRA_HEADERS_WO_SIZE = EXTRA_HEADERS_LEN - 4;
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, GetSegmentResponse msg, ByteBuf out) throws Exception {
+        log.debug("Sending segment {} to client {}", msg.getSegment().getSegmentId(), msg.getClientId());
+        encode(msg.getSegment(), out);
+    }
+
+    private void encode(Segment s, ByteBuf out) throws Exception {
+        SegmentId id = s.getSegmentId();
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size());
+        s.writeTo(baos);
+        byte[] segment = baos.toByteArray();
+
+        Hasher hasher = Hashing.murmur3_32().newHasher();
+        long hash = hasher.putBytes(segment).hash().padToLong();
+
+        int len = segment.length + EXTRA_HEADERS_WO_SIZE;
+        out.writeInt(len);
+        out.writeByte(Messages.HEADER_SEGMENT);
+        out.writeLong(id.getMostSignificantBits());
+        out.writeLong(id.getLeastSignificantBits());
+        out.writeLong(hash);
+        out.writeBytes(segment);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java Mon Sep 12 16:47:28 2016
@@ -18,6 +18,7 @@
 package org.apache.jackrabbit.oak.segment.standby.server;
 
 import java.util.List;
+import java.util.UUID;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
@@ -45,7 +46,7 @@ class RequestDecoder extends MessageToMe
             out.add(new GetHeadRequest(Messages.extractClientFrom(msg)));
         } else if (request.startsWith(Messages.GET_SEGMENT)) {
             log.debug("Parsed 'get segment' message");
-            out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_SEGMENT.length())));
+            out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), UUID.fromString(request.substring(Messages.GET_SEGMENT.length()))));
         } else if (request.startsWith(Messages.GET_BLOB)) {
             log.debug("Parsed 'get blob' message");
             out.add(new GetBlobRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_BLOB.length())));

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.net.InetSocketAddress;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+
+/**
+ * Notifies an observer when a valid request has been received and parsed by
+ * this server.
+ */
+class RequestObserverHandler extends ChannelInboundHandlerAdapter {
+
+    private final CommunicationObserver observer;
+
+    RequestObserverHandler(CommunicationObserver observer) {
+        this.observer = observer;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
+
+        if (msg instanceof GetHeadRequest) {
+            onGetHeadRequest((GetHeadRequest) msg, address);
+        } else if (msg instanceof GetSegmentRequest) {
+            onGetSegmentRequest((GetSegmentRequest) msg, address);
+        } else if (msg instanceof GetBlobRequest) {
+            onGetBlobRequest((GetBlobRequest) msg, address);
+        }
+
+        ctx.fireChannelRead(msg);
+    }
+
+    private void onGetHeadRequest(GetHeadRequest request, InetSocketAddress address) throws Exception {
+        observer.gotMessageFrom(request.getClientId(), "get head", address);
+    }
+
+    private void onGetSegmentRequest(GetSegmentRequest request, InetSocketAddress address) throws Exception {
+        observer.gotMessageFrom(request.getClientId(), "get segment", address);
+    }
+
+    private void onGetBlobRequest(GetBlobRequest request, InetSocketAddress address) throws Exception {
+        observer.gotMessageFrom(request.getClientId(), "get blob id", address);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+
+/**
+ * Notifies an observer when a 'get segment' or 'get blob' response is sent
+ * from this server.
+ */
+class ResponseObserverHandler extends ChannelOutboundHandlerAdapter {
+
+    private final CommunicationObserver observer;
+
+    ResponseObserverHandler(CommunicationObserver observer) {
+        this.observer = observer;
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        if (msg instanceof GetSegmentResponse) {
+            onGetSegmentResponse((GetSegmentResponse) msg);
+        } else if (msg instanceof GetBlobResponse) {
+            onGetBlobResponse((GetBlobResponse) msg);
+        }
+
+        ctx.write(msg, promise);
+    }
+
+    private void onGetSegmentResponse(GetSegmentResponse response) {
+        observer.didSendSegmentBytes(response.getClientId(), response.getSegment().size());
+    }
+
+    private void onGetBlobResponse(GetBlobResponse response) {
+        observer.didSendBinariesBytes(response.getClientId(), (int) Math.max(0, response.getBlob().length()));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.api.Blob;
+
+interface StandbyBlobReader {
+
+    Blob readBlob(String blobId);
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
+
+/**
+ * Read the head record ID.
+ */
+interface StandbyHeadReader {
+
+    /**
+     * Read the head record ID.
+     *
+     * @return the head record ID or {@code null} if the head record ID can't be
+     * found.
+     */
+    RecordId readHeadRecordId();
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+
+interface StandbySegmentReader {
+
+    Segment readSegment(UUID segmentId);
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java Mon Sep 12 16:47:28 2016
@@ -46,9 +46,6 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.SelfSignedCertificate;
 import io.netty.util.CharsetUtil;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.standby.codec.BlobEncoder;
-import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdEncoder;
-import org.apache.jackrabbit.oak.segment.standby.codec.SegmentEncoder;
 import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
 import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
 import org.slf4j.Logger;
@@ -65,11 +62,12 @@ public class StandbyServer implements St
     private final EventLoopGroup workerGroup;
     private final ServerBootstrap b;
     private final CommunicationObserver observer;
-    private final StandbyServerHandler handler;
     private SslContext sslContext;
     private ChannelFuture channelFuture;
     private boolean running;
 
+    private volatile String state;
+
     public StandbyServer(int port, final FileStore store) throws CertificateException, SSLException {
         this(port, store, null, false);
     }
@@ -91,7 +89,6 @@ public class StandbyServer implements St
         }
 
         observer = new CommunicationObserver("primary");
-        handler = new StandbyServerHandler(store, observer);
         bossGroup = new NioEventLoopGroup(1);
         workerGroup = new NioEventLoopGroup();
 
@@ -124,24 +121,49 @@ public class StandbyServer implements St
                     p.addLast(sslContext.newHandler(ch.alloc()));
                 }
 
+                // Decoders
+
                 p.addLast(new LineBasedFrameDecoder(8192));
                 p.addLast(new StringDecoder(CharsetUtil.UTF_8));
+                p.addLast(new RequestDecoder());
+                p.addLast(new StateHandler(newStateConsumer()));
+                p.addLast(new RequestObserverHandler(observer));
+
+                // Encoders
+
                 p.addLast(new SnappyFramedEncoder());
-                p.addLast(new RecordIdEncoder());
-                p.addLast(new SegmentEncoder());
-                p.addLast(new BlobEncoder());
-                p.addLast(handler);
+                p.addLast(new GetHeadResponseEncoder());
+                p.addLast(new GetSegmentResponseEncoder());
+                p.addLast(new GetBlobResponseEncoder());
+                p.addLast(new ResponseObserverHandler(observer));
+
+                // Handlers
+
+                p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store)));
+                p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store)));
+                p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store)));
             }
         });
     }
 
+    private StateConsumer newStateConsumer() {
+        return new StateConsumer() {
+
+            @Override
+            public void consumeState(String state) {
+                StandbyServer.this.state = state;
+            }
+
+        };
+    }
+
     public String getMBeanName() {
         return StandbyStatusMBean.JMX_NAME + ",id=" + this.port;
     }
 
     public void close() {
         stop();
-        handler.state = STATUS_CLOSING;
+        state = STATUS_CLOSING;
         observer.unregister();
         final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
         try {
@@ -157,7 +179,7 @@ public class StandbyServer implements St
         if (workerGroup != null && !workerGroup.isShuttingDown()) {
             workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
         }
-        handler.state = STATUS_CLOSED;
+        state = STATUS_CLOSED;
     }
 
     @Override
@@ -166,7 +188,7 @@ public class StandbyServer implements St
             return;
         }
 
-        handler.state = STATUS_STARTING;
+        state = STATUS_STARTING;
 
         channelFuture = b.bind(port);
 
@@ -189,20 +211,20 @@ public class StandbyServer implements St
 
     private void onSuccessfulStart() {
         log.debug("Binding was successful");
-        handler.state = STATUS_RUNNING;
+        state = STATUS_RUNNING;
         running = true;
     }
 
     private void onUnsuccessfulStart() {
         log.debug("Binding was unsuccessful", channelFuture.cause());
-        handler.state = null;
+        state = null;
         running = false;
         throw new RuntimeException(channelFuture.cause());
     }
 
     private void onStartTimeOut() {
         log.debug("Binding timed out, canceling");
-        handler.state = null;
+        state = null;
         running = false;
         channelFuture.cancel(true);
     }
@@ -219,13 +241,14 @@ public class StandbyServer implements St
     public void stop() {
         if (running) {
             running = false;
-            this.handler.state = STATUS_STOPPED;
+            this.state = STATUS_STOPPED;
             channelFuture.channel().disconnect();
         }
     }
 
     @Override
     public String getStatus() {
-        return handler == null ? STATUS_INITIALIZING : handler.state;
+        return state == null ? STATUS_INITIALIZING : state;
     }
+
 }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+/**
+ * Implementors of this interface can consume the state of the communication
+ * pipeline as tracked by {@link StateHandler}.
+ */
+interface StateConsumer {
+
+    void consumeState(String state);
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+/**
+ * Tracks the state of the communication pipeline and communicates it to an
+ * external consumer.
+ */
+class StateHandler extends ChannelInboundHandlerAdapter {
+
+    private final StateConsumer consumer;
+
+    StateHandler(StateConsumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        consumer.consumeState("channel registered");
+        super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        consumer.consumeState("channel active");
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        consumer.consumeState("channel inactive");
+        super.channelInactive(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        consumer.consumeState("channel unregistered");
+        super.channelUnregistered(ctx);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        consumer.consumeState("got message");
+        super.channelRead(ctx, msg);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        consumer.consumeState("exception occurred: " + cause.getMessage());
+        super.exceptionCaught(ctx, cause);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.junit.Test;
+
+public class GetBlobRequestHandlerTest {
+
+    @Test
+    public void successfulReadsShouldGenerateResponses() throws Exception {
+        Blob blob = mock(Blob.class);
+
+        StandbyBlobReader reader = mock(StandbyBlobReader.class);
+        when(reader.readBlob("blobId")).thenReturn(blob);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader));
+        channel.writeInbound(new GetBlobRequest("clientId", "blobId"));
+        GetBlobResponse response = (GetBlobResponse) channel.readOutbound();
+        assertEquals("clientId", response.getClientId());
+        assertSame(blob, response.getBlob());
+    }
+
+    @Test
+    public void unsuccessfulReadsShouldBeDiscarded() throws Exception {
+        StandbyBlobReader reader = mock(StandbyBlobReader.class);
+        when(reader.readBlob("blobId")).thenReturn(null);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader));
+        channel.writeInbound(new GetBlobRequest("clientId", "blobId"));
+        assertNull(channel.readOutbound());
+    }
+
+    @Test
+    public void unrecognizedMessagesShouldBeIgnored() throws Exception {
+        StandbyBlobReader reader = mock(StandbyBlobReader.class);
+        EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader));
+        channel.writeInbound("unrecognized");
+        assertEquals("unrecognized", channel.readInbound());
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.hash;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.junit.Test;
+
+public class GetBlobResponseEncoderTest {
+
+    @Test
+    public void encodeResponse() throws Exception {
+        byte[] data = new byte[] {1, 2, 3};
+
+        String contentIdentity = "contentIdentity";
+        byte[] contentIdentityBytes = contentIdentity.getBytes(Charsets.UTF_8);
+
+        Blob blob = mock(Blob.class);
+        when(blob.getNewStream()).thenReturn(new ByteArrayInputStream(data));
+        when(blob.getContentIdentity()).thenReturn(contentIdentity);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetBlobResponseEncoder());
+        channel.writeOutbound(new GetBlobResponse("clientId", blob));
+        ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+        ByteBuf expected = Unpooled.buffer();
+        expected.writeInt(3);
+        expected.writeByte(Messages.HEADER_BLOB);
+        expected.writeInt(contentIdentityBytes.length);
+        expected.writeBytes(contentIdentityBytes);
+        expected.writeLong(hash(data));
+        expected.writeBytes(data);
+
+        assertEquals(expected, buffer);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockRecordId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.junit.Test;
+
+public class GetHeadRequestHandlerTest {
+
+    @Test
+    public void successfulReadsShouldGenerateResponses() throws Exception {
+        RecordId headRecordId = mockRecordId(1, 2, 8);
+
+        StandbyHeadReader reader = mock(StandbyHeadReader.class);
+        when(reader.readHeadRecordId()).thenReturn(headRecordId);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader));
+        channel.writeInbound(new GetHeadRequest("clientId"));
+        GetHeadResponse response = (GetHeadResponse) channel.readOutbound();
+        assertSame(headRecordId, response.getHeadRecordId());
+        assertEquals("clientId", response.getClientId());
+    }
+
+    @Test
+    public void unsuccessfulReadsShouldBeDiscarded() throws Exception {
+        StandbyHeadReader reader = mock(StandbyHeadReader.class);
+        when(reader.readHeadRecordId()).thenReturn(null);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader));
+        channel.writeInbound(new GetHeadRequest("clientId"));
+        assertNull(channel.readOutbound());
+    }
+
+    @Test
+    public void unrecognizedMessagesShouldBeIgnored() throws Exception {
+        EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(mock(StandbyHeadReader.class)));
+        channel.writeInbound("unrecognized");
+        assertEquals("unrecognized", channel.readInbound());
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockRecordId;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.junit.Test;
+
+public class GetHeadResponseEncoderTest {
+
+    @Test
+    public void encodeResponse() throws Exception {
+        RecordId recordId = mockRecordId(1, 2, 8);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetHeadResponseEncoder());
+        channel.writeOutbound(new GetHeadResponse("clientId", recordId));
+        ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+        ByteBuf expected = Unpooled.buffer();
+        expected.writeInt(recordId.toString().getBytes(Charsets.UTF_8).length + 1);
+        expected.writeByte(Messages.HEADER_RECORD);
+        expected.writeBytes(recordId.toString().getBytes(Charsets.UTF_8));
+        assertEquals(expected, buffer);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockSegment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.junit.Test;
+
+public class GetSegmentRequestHandlerTest {
+
+    @Test
+    public void successfulReadsShouldGenerateResponses() throws Exception {
+        UUID uuid = new UUID(1, 2);
+
+        Segment segment = mockSegment(uuid, new byte[] {3, 4, 5});
+
+        StandbySegmentReader reader = mock(StandbySegmentReader.class);
+        when(reader.readSegment(uuid)).thenReturn(segment);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader));
+        channel.writeInbound(new GetSegmentRequest("clientId", uuid));
+        GetSegmentResponse response = (GetSegmentResponse) channel.readOutbound();
+        assertEquals("clientId", response.getClientId());
+        assertSame(segment, response.getSegment());
+    }
+
+    @Test
+    public void unsuccessfulReadsShouldBeDiscarded() throws Exception {
+        UUID uuid = new UUID(1, 2);
+
+        StandbySegmentReader reader = mock(StandbySegmentReader.class);
+        when(reader.readSegment(uuid)).thenReturn(null);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader));
+        channel.writeInbound(new GetSegmentRequest("clientId", uuid));
+        assertNull(channel.readOutbound());
+    }
+
+    @Test
+    public void unrecognizedMessagesShouldBeIgnored() throws Exception {
+        StandbySegmentReader reader = mock(StandbySegmentReader.class);
+        EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader));
+        channel.writeInbound("unrecognized");
+        assertEquals("unrecognized", channel.readInbound());
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.hash;
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockSegment;
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.junit.Test;
+
+public class GetSegmentResponseEncoderTest {
+
+    @Test
+    public void encodeResponse() throws Exception {
+        UUID uuid = new UUID(1, 2);
+        byte[] data = new byte[] {3, 4, 5};
+        Segment segment = mockSegment(uuid, data);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentResponseEncoder());
+        channel.writeOutbound(new GetSegmentResponse("clientId", segment));
+        ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+        ByteBuf expected = Unpooled.buffer();
+        expected.writeInt(data.length + 25);
+        expected.writeByte(Messages.HEADER_SEGMENT);
+        expected.writeLong(uuid.getMostSignificantBits());
+        expected.writeLong(uuid.getLeastSignificantBits());
+        expected.writeLong(hash(data));
+        expected.writeBytes(data);
+
+        assertEquals(expected, buffer);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java Mon Sep 12 16:47:28 2016
@@ -20,6 +20,8 @@ package org.apache.jackrabbit.oak.segmen
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
+import java.util.UUID;
+
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
 import org.junit.Test;
@@ -37,10 +39,10 @@ public class RequestDecoderTest {
     @Test
     public void shouldDecodeGetSegmentRequests() throws Exception {
         EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder());
-        channel.writeInbound(Messages.newGetSegmentRequest("clientId", "segmentId", false));
+        channel.writeInbound(Messages.newGetSegmentRequest("clientId", new UUID(1, 2).toString(), false));
         GetSegmentRequest request = (GetSegmentRequest) channel.readInbound();
         assertEquals("clientId", request.getClientId());
-        assertEquals("segmentId", request.getSegmentId());
+        assertEquals(new UUID(1, 2), request.getSegmentId());
     }
 
     @Test