You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/09 03:05:51 UTC
[ozone] 34/36: HDDS-6500. [Ozone-Streaming] Buffer the PutBlockRequest at the end of the stream. (#3229)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 96d5eb06ada19283ac2c730c7f0ae08270eac761
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Mar 28 20:43:38 2022 +0800
HDDS-6500. [Ozone-Streaming] Buffer the PutBlockRequest at the end of the stream. (#3229)
---
.../hdds/scm/storage/BlockDataStreamOutput.java | 48 +++-
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 26 ++
.../hdds/scm/storage/ContainerProtocolCalls.java | 14 +-
.../server/ratis/ContainerStateMachine.java | 27 +-
.../keyvalue/impl/KeyValueStreamDataChannel.java | 235 ++++++++++++++++
.../keyvalue/impl/StreamDataChannelBase.java | 3 +-
.../impl/TestKeyValueStreamDataChannel.java | 313 +++++++++++++++++++++
.../client/rpc/TestBlockDataStreamOutput.java | 16 +-
8 files changed, 657 insertions(+), 25 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index d5b9dd9d81..d19f2aea13 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
@@ -83,6 +84,9 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
public class BlockDataStreamOutput implements ByteBufferStreamOutput {
public static final Logger LOG =
LoggerFactory.getLogger(BlockDataStreamOutput.class);
+
+ public static final int PUT_BLOCK_REQUEST_LENGTH_MAX = 1 << 20; // 1MB
+
public static final String EXCEPTION_MSG =
"Unexpected Storage Container Exception: ";
private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {};
@@ -406,12 +410,26 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
byteBufferList = null;
}
waitFuturesComplete();
+ final BlockData blockData = containerBlockData.build();
if (close) {
- dataStreamCloseReply = out.closeAsync();
+ final ContainerCommandRequestProto putBlockRequest
+ = ContainerProtocolCalls.getPutBlockRequest(
+ xceiverClient.getPipeline(), blockData, true, token);
+ dataStreamCloseReply = executePutBlockClose(putBlockRequest,
+ PUT_BLOCK_REQUEST_LENGTH_MAX, out);
+ dataStreamCloseReply.whenComplete((reply, e) -> {
+ if (e != null || reply == null || !reply.isSuccess()) {
+ LOG.warn("Failed executePutBlockClose, reply=" + reply, e);
+ try {
+ executePutBlock(true, false);
+ } catch (IOException ex) {
+ throw new CompletionException(ex);
+ }
+ }
+ });
}
try {
- BlockData blockData = containerBlockData.build();
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close, token);
final CompletableFuture<ContainerCommandResponseProto> flushFuture
@@ -459,6 +477,30 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
}
}
+ public static CompletableFuture<DataStreamReply> executePutBlockClose(
+ ContainerCommandRequestProto putBlockRequest, int max,
+ DataStreamOutput out) {
+ final ByteBuffer putBlock = ContainerCommandRequestMessage.toMessage(
+ putBlockRequest, null).getContent().asReadOnlyByteBuffer();
+ final ByteBuffer protoLength = getProtoLength(putBlock, max);
+ RatisHelper.debug(putBlock, "putBlock", LOG);
+ out.writeAsync(putBlock);
+ RatisHelper.debug(protoLength, "protoLength", LOG);
+ return out.writeAsync(protoLength, StandardWriteOption.CLOSE);
+ }
+
+ public static ByteBuffer getProtoLength(ByteBuffer putBlock, int max) {
+ final int protoLength = putBlock.remaining();
+ Preconditions.checkState(protoLength <= max,
+ "protoLength== %s > max = %s", protoLength, max);
+ final ByteBuffer buffer = ByteBuffer.allocate(4);
+ buffer.putInt(protoLength);
+ buffer.flip();
+ LOG.debug("protoLength = {}", protoLength);
+ Preconditions.checkState(buffer.remaining() == 4);
+ return buffer.asReadOnlyBuffer();
+ }
+
@Override
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
@@ -547,7 +589,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
}
- private void setIoException(Exception e) {
+ private void setIoException(Throwable e) {
IOException ioe = getIoException();
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index a114634c83..4f9844011b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.ratis;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
@@ -58,6 +59,7 @@ import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -436,4 +438,28 @@ public final class RatisHelper {
throw new RuntimeException(e);
}
}
+
+ public static void debug(ByteBuffer buffer, String name, Logger log) {
+ if (!log.isDebugEnabled()) {
+ return;
+ }
+ buffer = buffer.duplicate();
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 1; buffer.remaining() > 0; i++) {
+ builder.append(buffer.get()).append(i % 20 == 0 ? "\n " : ", ");
+ }
+ log.debug("{}: {}\n {}", name, buffer, builder);
+ }
+
+ public static void debug(ByteBuf buf, String name, Logger log) {
+ if (!log.isDebugEnabled()) {
+ return;
+ }
+ buf = buf.duplicate();
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 1; buf.readableBytes() > 0; i++) {
+ builder.append(buf.readByte()).append(i % 20 == 0 ? "\n " : ", ");
+ }
+ log.debug("{}: {}\n {}", name, buf, builder);
+ }
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index b5365820e3..25d06fd18a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
@@ -233,11 +234,19 @@ public final class ContainerProtocolCalls {
XceiverClientSpi xceiverClient, BlockData containerBlockData, boolean eof,
Token<? extends TokenIdentifier> token)
throws IOException, InterruptedException, ExecutionException {
+ final ContainerCommandRequestProto request = getPutBlockRequest(
+ xceiverClient.getPipeline(), containerBlockData, eof, token);
+ return xceiverClient.sendCommandAsync(request);
+ }
+
+ public static ContainerCommandRequestProto getPutBlockRequest(
+ Pipeline pipeline, BlockData containerBlockData, boolean eof,
+ Token<? extends TokenIdentifier> token) throws IOException {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder()
.setBlockData(containerBlockData)
.setEof(eof);
- String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+ final String id = pipeline.getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
@@ -246,8 +255,7 @@ public final class ContainerProtocolCalls {
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
- ContainerCommandRequestProto request = builder.build();
- return xceiverClient.sendCommandAsync(request);
+ return builder.build();
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 916d3e7f5b..b9707e5af2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -427,6 +427,20 @@ public class ContainerStateMachine extends BaseStateMachine {
return dispatchCommand(requestProto, context);
}
+ private CompletableFuture<ContainerCommandResponseProto> runCommandAsync(
+ ContainerCommandRequestProto requestProto, LogEntryProto entry) {
+ return CompletableFuture.supplyAsync(() -> {
+ final DispatcherContext context = new DispatcherContext.Builder()
+ .setTerm(entry.getTerm())
+ .setLogIndex(entry.getIndex())
+ .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
+ .setContainer2BCSIDMap(container2BCSIDMap)
+ .build();
+
+ return runCommand(requestProto, context);
+ }, executor);
+ }
+
private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
@@ -560,19 +574,16 @@ public class ContainerStateMachine extends BaseStateMachine {
"DataStream: " + stream + " is not closed properly"));
}
- final CompletableFuture<ContainerCommandResponseProto> f;
+ final ContainerCommandRequestProto request;
if (dataChannel instanceof KeyValueStreamDataChannel) {
- f = CompletableFuture.completedFuture(null);
+ request = ((KeyValueStreamDataChannel) dataChannel).getPutBlockRequest();
} else {
return JavaUtils.completeExceptionally(new IllegalStateException(
"Unexpected DataChannel " + dataChannel.getClass()));
}
- return f.whenComplete((res, e) -> {
- if (LOG.isDebugEnabled()) {
- LOG.debug("PutBlock {} Term: {} Index: {}",
- res.getResult(), entry.getTerm(), entry.getIndex());
- }
- });
+ return runCommandAsync(request, entry).whenComplete(
+ (res, e) -> LOG.debug("link {}, entry: {}, request: {}",
+ res.getResult(), entry, request));
}
private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
index 66723031f0..99dc40f5d0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java
@@ -18,17 +18,131 @@
package org.apache.hadoop.ozone.container.keyvalue.impl;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* This class is used to get the DataChannel for streaming.
*/
public class KeyValueStreamDataChannel extends StreamDataChannelBase {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(KeyValueStreamDataChannel.class);
+
+ /**
+ * Keep the last {@link Buffers#max} bytes in the buffer
+ * in order to create putBlockRequest
+ * at {@link #closeBuffers(Buffers, WriteMethod)}}.
+ */
+ static class Buffers {
+ private final Deque<ReferenceCountedObject<ByteBuffer>> deque
+ = new LinkedList<>();
+ private final int max;
+ private int length;
+
+ Buffers(int max) {
+ this.max = max;
+ }
+
+ private boolean isExtra(int n) {
+ return length - n >= max;
+ }
+
+ private boolean hasExtraBuffer() {
+ return Optional.ofNullable(deque.peek())
+ .map(ReferenceCountedObject::get)
+ .filter(b -> isExtra(b.remaining()))
+ .isPresent();
+ }
+
+ /**
+ * @return extra buffers which are safe to be written.
+ */
+ Iterable<ReferenceCountedObject<ByteBuffer>> offer(
+ ReferenceCountedObject<ByteBuffer> ref) {
+ final ByteBuffer buffer = ref.retain();
+ LOG.debug("offer {}", buffer);
+ final boolean offered = deque.offer(ref);
+ Preconditions.checkState(offered, "Failed to offer");
+ length += buffer.remaining();
+
+ return () -> new Iterator<ReferenceCountedObject<ByteBuffer>>() {
+ @Override
+ public boolean hasNext() {
+ return hasExtraBuffer();
+ }
+
+ @Override
+ public ReferenceCountedObject<ByteBuffer> next() {
+ final ReferenceCountedObject<ByteBuffer> polled = poll();
+ length -= polled.get().remaining();
+ Preconditions.checkState(length >= max);
+ return polled;
+ }
+ };
+ }
+
+ ReferenceCountedObject<ByteBuffer> poll() {
+ final ReferenceCountedObject<ByteBuffer> polled
+ = Objects.requireNonNull(deque.poll());
+ RatisHelper.debug(polled.get(), "polled", LOG);
+ return polled;
+ }
+
+ ReferenceCountedObject<ByteBuf> pollAll() {
+ Preconditions.checkState(!deque.isEmpty(), "The deque is empty");
+ final ByteBuffer[] array = new ByteBuffer[deque.size()];
+ final List<ReferenceCountedObject<ByteBuffer>> refs
+ = new ArrayList<>(deque.size());
+ for (int i = 0; i < array.length; i++) {
+ final ReferenceCountedObject<ByteBuffer> ref = poll();
+ refs.add(ref);
+ array[i] = ref.get();
+ }
+ final ByteBuf buf = Unpooled.wrappedBuffer(array).asReadOnly();
+ return ReferenceCountedObject.wrap(buf, () -> {
+ }, () -> {
+ buf.release();
+ refs.forEach(ReferenceCountedObject::release);
+ });
+ }
+ }
+
+ interface WriteMethod {
+ int applyAsInt(ByteBuffer src) throws IOException;
+ }
+
+ private final Buffers buffers = new Buffers(
+ BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX);
+ private final AtomicReference<ContainerCommandRequestProto> putBlockRequest
+ = new AtomicReference<>();
+ private final AtomicBoolean closed = new AtomicBoolean();
+
KeyValueStreamDataChannel(File file, ContainerData containerData,
ContainerMetrics metrics)
throws StorageContainerException {
@@ -39,4 +153,125 @@ public class KeyValueStreamDataChannel extends StreamDataChannelBase {
ContainerProtos.Type getType() {
return ContainerProtos.Type.StreamWrite;
}
+
+ @Override
+ public int write(ReferenceCountedObject<ByteBuffer> referenceCounted)
+ throws IOException {
+ assertOpen();
+ return writeBuffers(referenceCounted, buffers, super::writeFileChannel);
+ }
+
+ static int writeBuffers(ReferenceCountedObject<ByteBuffer> src,
+ Buffers buffers, WriteMethod writeMethod)
+ throws IOException {
+ for (ReferenceCountedObject<ByteBuffer> b : buffers.offer(src)) {
+ try {
+ writeFully(b.get(), writeMethod);
+ } finally {
+ b.release();
+ }
+ }
+ return src.get().remaining();
+ }
+
+ private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
+ throws IOException {
+ for (; b.remaining() > 0;) {
+ final int written = writeMethod.applyAsInt(b);
+ if (written <= 0) {
+ throw new IOException("Unable to write");
+ }
+ }
+ }
+
+ public ContainerCommandRequestProto getPutBlockRequest() {
+ return Objects.requireNonNull(putBlockRequest.get(),
+ () -> "putBlockRequest == null, " + this);
+ }
+
+ void assertOpen() throws IOException {
+ if (closed.get()) {
+ throw new IOException("Already closed: " + this);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
+ super.close();
+ }
+ }
+
+ static ContainerCommandRequestProto closeBuffers(
+ Buffers buffers, WriteMethod writeMethod) throws IOException {
+ final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
+ final ByteBuf buf = ref.retain();
+ final ContainerCommandRequestProto putBlockRequest;
+ try {
+ putBlockRequest = readPutBlockRequest(buf);
+ // write the remaining data
+ writeFully(buf.nioBuffer(), writeMethod);
+ } finally {
+ ref.release();
+ }
+ return putBlockRequest;
+ }
+
+ private static int readProtoLength(ByteBuf b, int lengthIndex) {
+ final int readerIndex = b.readerIndex();
+ LOG.debug("{}, lengthIndex = {}, readerIndex = {}",
+ b, lengthIndex, readerIndex);
+ if (lengthIndex > readerIndex) {
+ b.readerIndex(lengthIndex);
+ } else {
+ Preconditions.checkState(lengthIndex == readerIndex);
+ }
+ RatisHelper.debug(b, "readProtoLength", LOG);
+ return b.nioBuffer().getInt();
+ }
+
+ static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b)
+ throws IOException {
+ // readerIndex protoIndex lengthIndex readerIndex+readableBytes
+ // V V V V
+ // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
+ final int readerIndex = b.readerIndex();
+ final int lengthIndex = readerIndex + b.readableBytes() - 4;
+ final int protoLength = readProtoLength(b.duplicate(), lengthIndex);
+ final int protoIndex = lengthIndex - protoLength;
+
+ final ContainerCommandRequestProto proto;
+ try {
+ proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer());
+ } catch (Throwable t) {
+ RatisHelper.debug(b, "catch", LOG);
+ throw new IOException("Failed to readPutBlockRequest from " + b
+ + ": readerIndex=" + readerIndex
+ + ", protoIndex=" + protoIndex
+ + ", protoLength=" + protoLength
+ + ", lengthIndex=" + lengthIndex, t);
+ }
+
+ // set index for reading data
+ b.writerIndex(protoIndex);
+
+ return proto;
+ }
+
+ private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
+ throws IOException {
+ RatisHelper.debug(b, "readPutBlockRequest", LOG);
+ final ByteString byteString = ByteString.copyFrom(b);
+
+ final ContainerCommandRequestProto request =
+ ContainerCommandRequestMessage.toProto(byteString, null);
+
+ if (!request.hasPutBlock()) {
+ throw new StorageContainerException(
+ "Malformed PutBlock request. trace ID: " + request.getTraceID(),
+ ContainerProtos.Result.MALFORMED_REQUEST);
+ }
+ return request;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
index b31e2ccbf4..9829033248 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java
@@ -79,8 +79,7 @@ abstract class StreamDataChannelBase implements StateMachine.DataChannel {
randomAccessFile.close();
}
- @Override
- public int write(ByteBuffer src) throws IOException {
+ final int writeFileChannel(ByteBuffer src) throws IOException {
final int writeBytes = getChannel().write(src);
metrics.incContainerBytesStats(getType(), writeBytes);
containerData.updateWriteStats(writeBytes, false);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
new file mode 100644
index 0000000000..d252b1cb1b
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.Buffers;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX;
+import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.executePutBlockClose;
+import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.getProtoLength;
+import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.closeBuffers;
+import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.readPutBlockRequest;
+import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeBuffers;
+
+/** For testing {@link KeyValueStreamDataChannel}. */
+public class TestKeyValueStreamDataChannel {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestKeyValueStreamDataChannel.class);
+
+ static final ContainerCommandRequestProto PUT_BLOCK_PROTO
+ = ContainerCommandRequestProto.newBuilder()
+ .setCmdType(Type.PutBlock)
+ .setPutBlock(PutBlockRequestProto.newBuilder().setBlockData(
+ BlockData.newBuilder().setBlockID(DatanodeBlockID.newBuilder()
+ .setContainerID(222).setLocalID(333).build()).build()))
+ .setDatanodeUuid("datanodeId")
+ .setContainerID(111L)
+ .build();
+ static final int PUT_BLOCK_PROTO_SIZE = PUT_BLOCK_PROTO.toByteString().size();
+ static {
+ LOG.info("PUT_BLOCK_PROTO_SIZE = {}", PUT_BLOCK_PROTO_SIZE);
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ final int max = PUT_BLOCK_REQUEST_LENGTH_MAX;
+ final ByteBuffer putBlockBuf = ContainerCommandRequestMessage.toMessage(
+ PUT_BLOCK_PROTO, null).getContent().asReadOnlyByteBuffer();
+ final ByteBuffer protoLengthBuf = getProtoLength(putBlockBuf, max);
+
+ // random data size
+ final int dataSize = ThreadLocalRandom.current().nextInt(1000) + 100;
+ final byte[] data = new byte[dataSize];
+
+ //serialize
+ final ByteBuf buf = Unpooled.buffer(max);
+ buf.writeBytes(data);
+ buf.writeBytes(putBlockBuf);
+ buf.writeBytes(protoLengthBuf);
+
+ final ContainerCommandRequestProto proto = readPutBlockRequest(buf);
+ Assert.assertEquals(PUT_BLOCK_PROTO, proto);
+ }
+
+ @Test
+ public void testBuffers() throws Exception {
+ final ExecutorService executor = Executors.newFixedThreadPool(32);
+ final List<CompletableFuture<String>> futures = new ArrayList<>();
+
+ final int min = PUT_BLOCK_PROTO_SIZE + 4;
+ final int[] maxValues = {min, 2 * min, 10 * min};
+ final int[] dataSizes = {0, 10, 100, 10_000};
+ for (int max : maxValues) {
+ for (int dataSize : dataSizes) {
+ futures.add(CompletableFuture.supplyAsync(
+ () -> runTestBuffers(dataSize, max), executor));
+ }
+ }
+
+ for (CompletableFuture<String> f : futures) {
+ f.get();
+ }
+ }
+
+ static String runTestBuffers(int dataSize, int max) {
+ final int seed = ThreadLocalRandom.current().nextInt();
+ final String name = String.format("[dataSize=%d,max=%d,seed=%H]",
+ dataSize, max, seed);
+ LOG.info(name);
+ try {
+ runTestBuffers(dataSize, max, seed, name);
+ } catch (Throwable t) {
+ throw new CompletionException("Failed " + name, t);
+ }
+ return name;
+ }
+
+ static void runTestBuffers(int dataSize, int max, int seed, String name)
+ throws Exception {
+ Assert.assertTrue(max >= PUT_BLOCK_PROTO_SIZE);
+
+ // random data
+ final byte[] data = new byte[dataSize];
+ final Random random = new Random(seed);
+ random.nextBytes(data);
+
+ // write output
+ final Buffers buffers = new Buffers(max);
+ final Output out = new Output(buffers);
+ for (int offset = 0; offset < dataSize;) {
+ final int randomLength = random.nextInt(4 * max);
+ final int length = Math.min(randomLength, dataSize - offset);
+ LOG.info("{}: offset = {}, length = {}", name, offset, length);
+ final ByteBuffer b = ByteBuffer.wrap(data, offset, length);
+ final DataStreamReply writeReply = out.writeAsync(b).get();
+ assertReply(writeReply, length, null);
+ offset += length;
+ }
+
+ // close
+ final DataStreamReply closeReply = executePutBlockClose(
+ PUT_BLOCK_PROTO, max, out).get();
+ assertReply(closeReply, 0, PUT_BLOCK_PROTO);
+
+ // check output
+ final ByteBuf outBuf = out.getOutBuf();
+ LOG.info("outBuf = {}", outBuf);
+ Assert.assertEquals(dataSize, outBuf.readableBytes());
+ for (int i = 0; i < dataSize; i++) {
+ Assert.assertEquals(data[i], outBuf.readByte());
+ }
+ outBuf.release();
+ }
+
+ static void assertReply(DataStreamReply reply, int byteWritten,
+ ContainerCommandRequestProto proto) {
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertEquals(byteWritten, reply.getBytesWritten());
+ Assert.assertEquals(proto, ((Reply)reply).getPutBlockRequest());
+ }
+
+ static class Output implements DataStreamOutput {
+ private final Buffers buffers;
+ private final ByteBuf outBuf = Unpooled.buffer();
+ private final WriteMethod writeMethod = src -> {
+ final int remaining = src.remaining();
+ outBuf.writeBytes(src);
+ return remaining;
+ };
+
+ Output(Buffers buffers) {
+ this.buffers = buffers;
+ }
+
+ ByteBuf getOutBuf() {
+ return outBuf;
+ }
+
+ @Override
+ public CompletableFuture<DataStreamReply> writeAsync(
+ ByteBuffer src, WriteOption... writeOptions) {
+ final int written;
+ try {
+ written = writeBuffers(
+ ReferenceCountedObject.wrap(src, () -> { }, () -> { }),
+ buffers, writeMethod);
+ } catch (IOException e) {
+ return completeExceptionally(e);
+ }
+ if (WriteOption.containsOption(writeOptions, StandardWriteOption.CLOSE)) {
+ return closeAsync();
+ }
+ return CompletableFuture.completedFuture(
+ new Reply(true, written));
+ }
+
+ @Override
+ public CompletableFuture<DataStreamReply> closeAsync() {
+ final ContainerCommandRequestProto putBlockRequest;
+ try {
+ putBlockRequest = closeBuffers(buffers, writeMethod);
+ } catch (IOException e) {
+ return completeExceptionally(e);
+ }
+ return CompletableFuture.completedFuture(
+ new Reply(true, 0, putBlockRequest));
+ }
+
+ @Override
+ public CompletableFuture<DataStreamReply> writeAsync(
+ FilePositionCount filePositionCount, WriteOption... writeOptions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WritableByteChannel getWritableByteChannel() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ static class Reply implements DataStreamReply {
+ private final boolean success;
+ private final long bytesWritten;
+ private final ContainerCommandRequestProto putBlockRequest;
+
+ Reply(boolean success, long bytesWritten) {
+ this(success, bytesWritten, null);
+ }
+
+ Reply(boolean success, long bytesWritten,
+ ContainerCommandRequestProto putBlockRequest) {
+ this.success = success;
+ this.bytesWritten = bytesWritten;
+ this.putBlockRequest = putBlockRequest;
+ }
+
+ ContainerCommandRequestProto getPutBlockRequest() {
+ return putBlockRequest;
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return success;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ @Override
+ public Collection<CommitInfoProto> getCommitInfos() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ClientId getClientId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataStreamPacketHeaderProto.Type getType() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getStreamId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getStreamOffset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getDataLength() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ static CompletableFuture<DataStreamReply> completeExceptionally(Throwable t) {
+ final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
+ f.completeExceptionally(t);
+ return f;
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 65f7348740..c8a0115a80 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -120,7 +120,7 @@ public class TestBlockDataStreamOutput {
objectStore.getVolume(volumeName).createBucket(bucketName);
}
- private String getKeyName() {
+ static String getKeyName() {
return UUID.randomUUID().toString();
}
@@ -158,13 +158,11 @@ public class TestBlockDataStreamOutput {
testWriteWithFailure(blockSize + 50);
}
- private void testWrite(int dataLength) throws Exception {
+ static void testWrite(int dataLength) throws Exception {
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
keyName, ReplicationType.RATIS, dataLength);
- byte[] data =
- ContainerTestHelper.getFixedLengthString(keyString, dataLength)
- .getBytes(UTF_8);
+ final byte[] data = ContainerTestHelper.generateData(dataLength, false);
key.write(ByteBuffer.wrap(data));
// now close the stream, It will update the key length.
key.close();
@@ -221,14 +219,14 @@ public class TestBlockDataStreamOutput {
}
- private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
+ static OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
long size) throws Exception {
return TestHelper.createStreamKey(
keyName, type, size, objectStore, volumeName, bucketName);
}
- private void validateData(String keyName, byte[] data) throws Exception {
- TestHelper
- .validateData(keyName, data, objectStore, volumeName, bucketName);
+ static void validateData(String keyName, byte[] data) throws Exception {
+ TestHelper.validateData(
+ keyName, data, objectStore, volumeName, bucketName);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org