You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/09/12 16:47:28 UTC
svn commit: r1760405 [1/2] - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/standby/server/
test/java/org/apache/jackrabbit/oak/segment/standby/server/
Author: frm
Date: Mon Sep 12 16:47:28 2016
New Revision: 1760405
URL: http://svn.apache.org/viewvc?rev=1760405&view=rev
Log:
OAK-4785 - Split StandbyServerHandler into smaller, more cohesive handlers
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/ServerTestUtils.java (with props)
Removed:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerHandler.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+class DefaultStandbyBlobReader implements StandbyBlobReader {
+
+ private final FileStore store;
+
+ DefaultStandbyBlobReader(FileStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public Blob readBlob(String blobId) {
+ BlobStore blobStore = store.getBlobStore();
+
+ if (blobStore != null) {
+ return new BlobStoreBlob(blobStore, blobId);
+ }
+
+ return null;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+
+class DefaultStandbyHeadReader implements StandbyHeadReader {
+
+ private final FileStore store;
+
+ DefaultStandbyHeadReader(FileStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public RecordId readHeadRecordId() {
+ return store.getHead().getRecordId();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DefaultStandbySegmentReader implements StandbySegmentReader {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultStandbySegmentReader.class);
+
+ private final FileStore store;
+
+ DefaultStandbySegmentReader(FileStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public Segment readSegment(UUID uuid) {
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+
+ SegmentId id = store.newSegmentId(msb, lsb);
+
+ for (int i = 0; i < 10; i++) {
+ try {
+ return store.readSegment(id);
+ } catch (SegmentNotFoundException e) {
+ log.warn("Unable to read segment, waiting...", e);
+ }
+
+ try {
+ TimeUnit.MILLISECONDS.sleep(2000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+
+ return null;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetBlobRequestHandler extends SimpleChannelInboundHandler<GetBlobRequest> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetBlobRequestHandler.class);
+
+ private final StandbyBlobReader reader;
+
+ GetBlobRequestHandler(StandbyBlobReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, GetBlobRequest msg) throws Exception {
+ log.debug("Reading blob {} for client {}", msg.getBlobId(), msg.getClientId());
+
+ Blob blob = reader.readBlob(msg.getBlobId());
+
+ if (blob == null) {
+ log.debug("Blob {} not found, discarding request from client {}", msg.getBlobId(), msg.getClientId());
+ return;
+ }
+
+ ctx.writeAndFlush(new GetBlobResponse(msg.getClientId(), blob));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.api.Blob;
+
+class GetBlobResponse {
+
+ private final String clientId;
+
+ private final Blob blob;
+
+ GetBlobResponse(String clientId, Blob blob) {
+ this.clientId = clientId;
+ this.blob = blob;
+ }
+
+ String getClientId() {
+ return clientId;
+ }
+
+ Blob getBlob() {
+ return blob;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponse.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetBlobResponseEncoder extends MessageToByteEncoder<GetBlobResponse> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetBlobResponseEncoder.class);
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, GetBlobResponse msg, ByteBuf out) throws Exception {
+ log.debug("Sending blob {} to client {}", msg.getBlob().getContentIdentity(), msg.getClientId());
+ encode(msg.getBlob(), out);
+ }
+
+ private void encode(Blob b, ByteBuf out) throws Exception {
+ byte[] bytes;
+
+ try (InputStream s = b.getNewStream()) {
+ bytes = IOUtils.toByteArray(s);
+ }
+
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long hash = hasher.putBytes(bytes).hash().padToLong();
+
+ out.writeInt(bytes.length);
+ out.writeByte(Messages.HEADER_BLOB);
+
+ String bid = b.getContentIdentity();
+ byte[] id = bid.getBytes(Charset.forName("UTF-8"));
+ out.writeInt(id.length);
+ out.writeBytes(id);
+
+ out.writeLong(hash);
+ out.writeBytes(bytes);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles 'get head' requests and produces 'get head' responses. A response is
+ * generated only iff the record ID of the head root state can be read.
+ */
+class GetHeadRequestHandler extends SimpleChannelInboundHandler<GetHeadRequest> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetHeadRequest.class);
+
+ private final StandbyHeadReader reader;
+
+ GetHeadRequestHandler(StandbyHeadReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, GetHeadRequest msg) throws Exception {
+ log.debug("Reading head for client {}", msg.getClientId());
+
+ RecordId id = reader.readHeadRecordId();
+
+ if (id == null) {
+ log.debug("Head not found, discarding request from client {}", msg.getClientId());
+ return;
+ }
+
+ ctx.writeAndFlush(new GetHeadResponse(msg.getClientId(), id));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
+
+class GetHeadResponse {
+
+ private final String clientId;
+
+ private final RecordId headRecordId;
+
+ GetHeadResponse(String clientId, RecordId headRecordId) {
+ this.clientId = clientId;
+ this.headRecordId = headRecordId;
+ }
+
+ String getClientId() {
+ return clientId;
+ }
+
+ RecordId getHeadRecordId() {
+ return headRecordId;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponse.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.util.CharsetUtil;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encodes a 'get head' response.
+ */
+class GetHeadResponseEncoder extends MessageToByteEncoder<GetHeadResponse> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetHeadResponseEncoder.class);
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, GetHeadResponse msg, ByteBuf out) throws Exception {
+ log.debug("Sending head {} to client {}", msg.getHeadRecordId(), msg.getClientId());
+ byte[] body = msg.getHeadRecordId().toString().getBytes(CharsetUtil.UTF_8);
+ out.writeInt(body.length + 1);
+ out.writeByte(Messages.HEADER_RECORD);
+ out.writeBytes(body);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequest.java Mon Sep 12 16:47:28 2016
@@ -17,13 +17,15 @@
package org.apache.jackrabbit.oak.segment.standby.server;
+import java.util.UUID;
+
class GetSegmentRequest {
private final String clientId;
- private final String segmentId;
+ private final UUID segmentId;
- GetSegmentRequest(String clientId, String segmentId) {
+ GetSegmentRequest(String clientId, UUID segmentId) {
this.clientId = clientId;
this.segmentId = segmentId;
}
@@ -32,7 +34,7 @@ class GetSegmentRequest {
return clientId;
}
- public String getSegmentId() {
+ public UUID getSegmentId() {
return segmentId;
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetSegmentRequestHandler extends SimpleChannelInboundHandler<GetSegmentRequest> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetSegmentRequestHandler.class);
+
+ private final StandbySegmentReader reader;
+
+ GetSegmentRequestHandler(StandbySegmentReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, GetSegmentRequest msg) throws Exception {
+ log.debug("Reading segment {} for client {}", msg.getSegmentId(), msg.getClientId());
+
+ Segment segment = reader.readSegment(msg.getSegmentId());
+
+ if (segment == null) {
+ log.debug("Segment {} not found, discarding request from client {}", msg.getSegmentId(), msg.getClientId());
+ return;
+ }
+
+ ctx.writeAndFlush(new GetSegmentResponse(msg.getClientId(), segment));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+
+class GetSegmentResponse {
+
+ private final String clientId;
+
+ private final Segment segment;
+
+ GetSegmentResponse(String clientId, Segment segment) {
+ this.clientId = clientId;
+ this.segment = segment;
+ }
+
+ String getClientId() {
+ return clientId;
+ }
+
+ Segment getSegment() {
+ return segment;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponse.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.io.ByteArrayOutputStream;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encodes a 'get segment' response.
+ */
+class GetSegmentResponseEncoder extends MessageToByteEncoder<GetSegmentResponse> {
+
+ private static final Logger log = LoggerFactory.getLogger(GetSegmentResponseEncoder.class);
+
+ private static final int EXTRA_HEADERS_LEN = 29;
+
+ private static final int EXTRA_HEADERS_WO_SIZE = EXTRA_HEADERS_LEN - 4;
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, GetSegmentResponse msg, ByteBuf out) throws Exception {
+ log.debug("Sending segment {} to client {}", msg.getSegment().getSegmentId(), msg.getClientId());
+ encode(msg.getSegment(), out);
+ }
+
+ private void encode(Segment s, ByteBuf out) throws Exception {
+ SegmentId id = s.getSegmentId();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(s.size());
+ s.writeTo(baos);
+ byte[] segment = baos.toByteArray();
+
+ Hasher hasher = Hashing.murmur3_32().newHasher();
+ long hash = hasher.putBytes(segment).hash().padToLong();
+
+ int len = segment.length + EXTRA_HEADERS_WO_SIZE;
+ out.writeInt(len);
+ out.writeByte(Messages.HEADER_SEGMENT);
+ out.writeLong(id.getMostSignificantBits());
+ out.writeLong(id.getLeastSignificantBits());
+ out.writeLong(hash);
+ out.writeBytes(segment);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoder.java Mon Sep 12 16:47:28 2016
@@ -18,6 +18,7 @@
package org.apache.jackrabbit.oak.segment.standby.server;
import java.util.List;
+import java.util.UUID;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
@@ -45,7 +46,7 @@ class RequestDecoder extends MessageToMe
out.add(new GetHeadRequest(Messages.extractClientFrom(msg)));
} else if (request.startsWith(Messages.GET_SEGMENT)) {
log.debug("Parsed 'get segment' message");
- out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_SEGMENT.length())));
+ out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), UUID.fromString(request.substring(Messages.GET_SEGMENT.length()))));
} else if (request.startsWith(Messages.GET_BLOB)) {
log.debug("Parsed 'get blob' message");
out.add(new GetBlobRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_BLOB.length())));
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.net.InetSocketAddress;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+
+/**
+ * Notifies an observer when a valid request has been received and parsed by
+ * this server.
+ */
+class RequestObserverHandler extends ChannelInboundHandlerAdapter {
+
+ private final CommunicationObserver observer;
+
+ RequestObserverHandler(CommunicationObserver observer) {
+ this.observer = observer;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
+
+ if (msg instanceof GetHeadRequest) {
+ onGetHeadRequest((GetHeadRequest) msg, address);
+ } else if (msg instanceof GetSegmentRequest) {
+ onGetSegmentRequest((GetSegmentRequest) msg, address);
+ } else if (msg instanceof GetBlobRequest) {
+ onGetBlobRequest((GetBlobRequest) msg, address);
+ }
+
+ ctx.fireChannelRead(msg);
+ }
+
+ private void onGetHeadRequest(GetHeadRequest request, InetSocketAddress address) throws Exception {
+ observer.gotMessageFrom(request.getClientId(), "get head", address);
+ }
+
+ private void onGetSegmentRequest(GetSegmentRequest request, InetSocketAddress address) throws Exception {
+ observer.gotMessageFrom(request.getClientId(), "get segment", address);
+ }
+
+ private void onGetBlobRequest(GetBlobRequest request, InetSocketAddress address) throws Exception {
+ observer.gotMessageFrom(request.getClientId(), "get blob id", address);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/RequestObserverHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
+
+/**
+ * Notifies an observer when a 'get segment' or 'get blob' response is sent
+ * from this server.
+ */
+class ResponseObserverHandler extends ChannelOutboundHandlerAdapter {
+
+ private final CommunicationObserver observer;
+
+ ResponseObserverHandler(CommunicationObserver observer) {
+ this.observer = observer;
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ if (msg instanceof GetSegmentResponse) {
+ onGetSegmentResponse((GetSegmentResponse) msg);
+ } else if (msg instanceof GetBlobResponse) {
+ onGetBlobResponse((GetBlobResponse) msg);
+ }
+
+ ctx.write(msg, promise);
+ }
+
+ private void onGetSegmentResponse(GetSegmentResponse response) {
+ observer.didSendSegmentBytes(response.getClientId(), response.getSegment().size());
+ }
+
+ private void onGetBlobResponse(GetBlobResponse response) {
+ observer.didSendBinariesBytes(response.getClientId(), (int) Math.max(0, response.getBlob().length()));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.api.Blob;
+
+interface StandbyBlobReader {
+
+ Blob readBlob(String blobId);
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
+
+/**
+ * Read the head record ID.
+ */
+interface StandbyHeadReader {
+
+ /**
+ * Read the head record ID.
+ *
+ * @return the head record ID or {@code null} if the head record ID can't be
+ * found.
+ */
+ RecordId readHeadRecordId();
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.segment.Segment;
+
+interface StandbySegmentReader {
+
+ Segment readSegment(UUID segmentId);
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbySegmentReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java Mon Sep 12 16:47:28 2016
@@ -46,9 +46,6 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;
import org.apache.jackrabbit.oak.segment.file.FileStore;
-import org.apache.jackrabbit.oak.segment.standby.codec.BlobEncoder;
-import org.apache.jackrabbit.oak.segment.standby.codec.RecordIdEncoder;
-import org.apache.jackrabbit.oak.segment.standby.codec.SegmentEncoder;
import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean;
import org.apache.jackrabbit.oak.segment.standby.store.CommunicationObserver;
import org.slf4j.Logger;
@@ -65,11 +62,12 @@ public class StandbyServer implements St
private final EventLoopGroup workerGroup;
private final ServerBootstrap b;
private final CommunicationObserver observer;
- private final StandbyServerHandler handler;
private SslContext sslContext;
private ChannelFuture channelFuture;
private boolean running;
+ private volatile String state;
+
public StandbyServer(int port, final FileStore store) throws CertificateException, SSLException {
this(port, store, null, false);
}
@@ -91,7 +89,6 @@ public class StandbyServer implements St
}
observer = new CommunicationObserver("primary");
- handler = new StandbyServerHandler(store, observer);
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
@@ -124,24 +121,49 @@ public class StandbyServer implements St
p.addLast(sslContext.newHandler(ch.alloc()));
}
+ // Decoders
+
p.addLast(new LineBasedFrameDecoder(8192));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
+ p.addLast(new RequestDecoder());
+ p.addLast(new StateHandler(newStateConsumer()));
+ p.addLast(new RequestObserverHandler(observer));
+
+ // Encoders
+
p.addLast(new SnappyFramedEncoder());
- p.addLast(new RecordIdEncoder());
- p.addLast(new SegmentEncoder());
- p.addLast(new BlobEncoder());
- p.addLast(handler);
+ p.addLast(new GetHeadResponseEncoder());
+ p.addLast(new GetSegmentResponseEncoder());
+ p.addLast(new GetBlobResponseEncoder());
+ p.addLast(new ResponseObserverHandler(observer));
+
+ // Handlers
+
+ p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store)));
+ p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store)));
+ p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store)));
}
});
}
+ private StateConsumer newStateConsumer() {
+ return new StateConsumer() {
+
+ @Override
+ public void consumeState(String state) {
+ StandbyServer.this.state = state;
+ }
+
+ };
+ }
+
public String getMBeanName() {
return StandbyStatusMBean.JMX_NAME + ",id=" + this.port;
}
public void close() {
stop();
- handler.state = STATUS_CLOSING;
+ state = STATUS_CLOSING;
observer.unregister();
final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer();
try {
@@ -157,7 +179,7 @@ public class StandbyServer implements St
if (workerGroup != null && !workerGroup.isShuttingDown()) {
workerGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).syncUninterruptibly();
}
- handler.state = STATUS_CLOSED;
+ state = STATUS_CLOSED;
}
@Override
@@ -166,7 +188,7 @@ public class StandbyServer implements St
return;
}
- handler.state = STATUS_STARTING;
+ state = STATUS_STARTING;
channelFuture = b.bind(port);
@@ -189,20 +211,20 @@ public class StandbyServer implements St
private void onSuccessfulStart() {
log.debug("Binding was successful");
- handler.state = STATUS_RUNNING;
+ state = STATUS_RUNNING;
running = true;
}
private void onUnsuccessfulStart() {
log.debug("Binding was unsuccessful", channelFuture.cause());
- handler.state = null;
+ state = null;
running = false;
throw new RuntimeException(channelFuture.cause());
}
private void onStartTimeOut() {
log.debug("Binding timed out, canceling");
- handler.state = null;
+ state = null;
running = false;
channelFuture.cancel(true);
}
@@ -219,13 +241,14 @@ public class StandbyServer implements St
public void stop() {
if (running) {
running = false;
- this.handler.state = STATUS_STOPPED;
+ this.state = STATUS_STOPPED;
channelFuture.channel().disconnect();
}
}
@Override
public String getStatus() {
- return handler == null ? STATUS_INITIALIZING : handler.state;
+ return state == null ? STATUS_INITIALIZING : state;
}
+
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+/**
+ * Implementors of this interface can consume the state of the communication
+ * pipeline as tracked by {@link StateHandler}.
+ */
+interface StateConsumer {
+
+ void consumeState(String state);
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+/**
+ * Tracks the state of the communication pipeline and communicates it to an
+ * external consumer.
+ */
+class StateHandler extends ChannelInboundHandlerAdapter {
+
+ private final StateConsumer consumer;
+
+ StateHandler(StateConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ consumer.consumeState("channel registered");
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ consumer.consumeState("channel active");
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ consumer.consumeState("channel inactive");
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ consumer.consumeState("channel unregistered");
+ super.channelUnregistered(ctx);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ consumer.consumeState("got message");
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ consumer.consumeState("exception occurred: " + cause.getMessage());
+ super.exceptionCaught(ctx, cause);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StateHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.junit.Test;
+
+public class GetBlobRequestHandlerTest {
+
+ @Test
+ public void successfulReadsShouldGenerateResponses() throws Exception {
+ Blob blob = mock(Blob.class);
+
+ StandbyBlobReader reader = mock(StandbyBlobReader.class);
+ when(reader.readBlob("blobId")).thenReturn(blob);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader));
+ channel.writeInbound(new GetBlobRequest("clientId", "blobId"));
+ GetBlobResponse response = (GetBlobResponse) channel.readOutbound();
+ assertEquals("clientId", response.getClientId());
+ assertSame(blob, response.getBlob());
+ }
+
+ @Test
+ public void unsuccessfulReadsShouldBeDiscarded() throws Exception {
+ StandbyBlobReader reader = mock(StandbyBlobReader.class);
+ when(reader.readBlob("blobId")).thenReturn(null);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader));
+ channel.writeInbound(new GetBlobRequest("clientId", "blobId"));
+ assertNull(channel.readOutbound());
+ }
+
+ @Test
+ public void unrecognizedMessagesShouldBeIgnored() throws Exception {
+ StandbyBlobReader reader = mock(StandbyBlobReader.class);
+ EmbeddedChannel channel = new EmbeddedChannel(new GetBlobRequestHandler(reader));
+ channel.writeInbound("unrecognized");
+ assertEquals("unrecognized", channel.readInbound());
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.hash;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.junit.Test;
+
+public class GetBlobResponseEncoderTest {
+
+ @Test
+ public void encodeResponse() throws Exception {
+ byte[] data = new byte[] {1, 2, 3};
+
+ String contentIdentity = "contentIdentity";
+ byte[] contentIdentityBytes = contentIdentity.getBytes(Charsets.UTF_8);
+
+ Blob blob = mock(Blob.class);
+ when(blob.getNewStream()).thenReturn(new ByteArrayInputStream(data));
+ when(blob.getContentIdentity()).thenReturn(contentIdentity);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetBlobResponseEncoder());
+ channel.writeOutbound(new GetBlobResponse("clientId", blob));
+ ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+ ByteBuf expected = Unpooled.buffer();
+ expected.writeInt(3);
+ expected.writeByte(Messages.HEADER_BLOB);
+ expected.writeInt(contentIdentityBytes.length);
+ expected.writeBytes(contentIdentityBytes);
+ expected.writeLong(hash(data));
+ expected.writeBytes(data);
+
+ assertEquals(expected, buffer);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobResponseEncoderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockRecordId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.junit.Test;
+
+public class GetHeadRequestHandlerTest {
+
+ @Test
+ public void successfulReadsShouldGenerateResponses() throws Exception {
+ RecordId headRecordId = mockRecordId(1, 2, 8);
+
+ StandbyHeadReader reader = mock(StandbyHeadReader.class);
+ when(reader.readHeadRecordId()).thenReturn(headRecordId);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader));
+ channel.writeInbound(new GetHeadRequest("clientId"));
+ GetHeadResponse response = (GetHeadResponse) channel.readOutbound();
+ assertSame(headRecordId, response.getHeadRecordId());
+ assertEquals("clientId", response.getClientId());
+ }
+
+ @Test
+ public void unsuccessfulReadsShouldBeDiscarded() throws Exception {
+ StandbyHeadReader reader = mock(StandbyHeadReader.class);
+ when(reader.readHeadRecordId()).thenReturn(null);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader));
+ channel.writeInbound(new GetHeadRequest("clientId"));
+ assertNull(channel.readOutbound());
+ }
+
+ @Test
+ public void unrecognizedMessagesShouldBeIgnored() throws Exception {
+ EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(mock(StandbyHeadReader.class)));
+ channel.writeInbound("unrecognized");
+ assertEquals("unrecognized", channel.readInbound());
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockRecordId;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.Charsets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.junit.Test;
+
+public class GetHeadResponseEncoderTest {
+
+ @Test
+ public void encodeResponse() throws Exception {
+ RecordId recordId = mockRecordId(1, 2, 8);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetHeadResponseEncoder());
+ channel.writeOutbound(new GetHeadResponse("clientId", recordId));
+ ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+ ByteBuf expected = Unpooled.buffer();
+ expected.writeInt(recordId.toString().getBytes(Charsets.UTF_8).length + 1);
+ expected.writeByte(Messages.HEADER_RECORD);
+ expected.writeBytes(recordId.toString().getBytes(Charsets.UTF_8));
+ assertEquals(expected, buffer);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadResponseEncoderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockSegment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.junit.Test;
+
+public class GetSegmentRequestHandlerTest {
+
+ @Test
+ public void successfulReadsShouldGenerateResponses() throws Exception {
+ UUID uuid = new UUID(1, 2);
+
+ Segment segment = mockSegment(uuid, new byte[] {3, 4, 5});
+
+ StandbySegmentReader reader = mock(StandbySegmentReader.class);
+ when(reader.readSegment(uuid)).thenReturn(segment);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader));
+ channel.writeInbound(new GetSegmentRequest("clientId", uuid));
+ GetSegmentResponse response = (GetSegmentResponse) channel.readOutbound();
+ assertEquals("clientId", response.getClientId());
+ assertSame(segment, response.getSegment());
+ }
+
+ @Test
+ public void unsuccessfulReadsShouldBeDiscarded() throws Exception {
+ UUID uuid = new UUID(1, 2);
+
+ StandbySegmentReader reader = mock(StandbySegmentReader.class);
+ when(reader.readSegment(uuid)).thenReturn(null);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader));
+ channel.writeInbound(new GetSegmentRequest("clientId", uuid));
+ assertNull(channel.readOutbound());
+ }
+
+ @Test
+ public void unrecognizedMessagesShouldBeIgnored() throws Exception {
+ StandbySegmentReader reader = mock(StandbySegmentReader.class);
+ EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentRequestHandler(reader));
+ channel.writeInbound("unrecognized");
+ assertEquals("unrecognized", channel.readInbound());
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentRequestHandlerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java?rev=1760405&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java Mon Sep 12 16:47:28 2016
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.standby.server;
+
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.hash;
+import static org.apache.jackrabbit.oak.segment.standby.server.ServerTestUtils.mockSegment;
+import static org.junit.Assert.assertEquals;
+
+import java.util.UUID;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
+import org.junit.Test;
+
+public class GetSegmentResponseEncoderTest {
+
+ @Test
+ public void encodeResponse() throws Exception {
+ UUID uuid = new UUID(1, 2);
+ byte[] data = new byte[] {3, 4, 5};
+ Segment segment = mockSegment(uuid, data);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new GetSegmentResponseEncoder());
+ channel.writeOutbound(new GetSegmentResponse("clientId", segment));
+ ByteBuf buffer = (ByteBuf) channel.readOutbound();
+
+ ByteBuf expected = Unpooled.buffer();
+ expected.writeInt(data.length + 25);
+ expected.writeByte(Messages.HEADER_SEGMENT);
+ expected.writeLong(uuid.getMostSignificantBits());
+ expected.writeLong(uuid.getLeastSignificantBits());
+ expected.writeLong(hash(data));
+ expected.writeBytes(data);
+
+ assertEquals(expected, buffer);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetSegmentResponseEncoderTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java?rev=1760405&r1=1760404&r2=1760405&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/RequestDecoderTest.java Mon Sep 12 16:47:28 2016
@@ -20,6 +20,8 @@ package org.apache.jackrabbit.oak.segmen
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import java.util.UUID;
+
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.jackrabbit.oak.segment.standby.codec.Messages;
import org.junit.Test;
@@ -37,10 +39,10 @@ public class RequestDecoderTest {
@Test
public void shouldDecodeGetSegmentRequests() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder());
- channel.writeInbound(Messages.newGetSegmentRequest("clientId", "segmentId", false));
+ channel.writeInbound(Messages.newGetSegmentRequest("clientId", new UUID(1, 2).toString(), false));
GetSegmentRequest request = (GetSegmentRequest) channel.readInbound();
assertEquals("clientId", request.getClientId());
- assertEquals("segmentId", request.getSegmentId());
+ assertEquals(new UUID(1, 2), request.getSegmentId());
}
@Test