You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/03/15 06:56:43 UTC
[ozone] 18/30: HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed boundary for streaming writes. (#2866)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit d75ea4416c90f74abcc19c0cc35b19810cf6620b
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Wed Dec 1 11:00:46 2021 +0530
HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed boundary for streaming writes. (#2866)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 31 ++++++++++++++++++++
.../hdds/scm/storage/BlockDataStreamOutput.java | 29 ++++++++++++++----
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 12 ++++++++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 11 +++++++
.../client/rpc/TestBlockDataStreamOutput.java | 34 ++++++++++++++++++++++
5 files changed, 111 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 064ce6e..7fc9bbf 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -66,6 +66,21 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;
+ @Config(key = "datastream.max.buffer.size",
+ defaultValue = "4MB",
+ type = ConfigType.SIZE,
+ description = "The maximum size of the ByteBuffer "
+ + "(used via ratis streaming)",
+ tags = ConfigTag.CLIENT)
+ private int dataStreamMaxBufferSize = 4 * 1024 * 1024;
+
+ @Config(key = "datastream.buffer.flush.size",
+ defaultValue = "16MB",
+ type = ConfigType.SIZE,
+ description = "The boundary at which putBlock is executed",
+ tags = ConfigTag.CLIENT)
+ private long dataStreamBufferFlushSize = 16 * 1024 * 1024;
+
@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
@@ -194,6 +209,14 @@ public class OzoneClientConfig {
this.streamBufferSize = streamBufferSize;
}
+ public int getDataStreamMaxBufferSize() {
+ return dataStreamMaxBufferSize;
+ }
+
+ public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) {
+ this.dataStreamMaxBufferSize = dataStreamMaxBufferSize;
+ }
+
public boolean isStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}
@@ -254,6 +277,14 @@ public class OzoneClientConfig {
return bufferIncrement;
}
+ public long getDataStreamBufferFlushSize() {
+ return dataStreamBufferFlushSize;
+ }
+
+ public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) {
+ this.dataStreamBufferFlushSize = dataStreamBufferFlushSize;
+ }
+
public ChecksumCombineMode getChecksumCombineMode() {
try {
return ChecksumCombineMode.valueOf(checksumCombineMode);
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index aada48e..6f5a543 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -257,13 +257,30 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
if (len == 0) {
return;
}
+ int curLen = len;
+ // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold
+ int maxBufferLen = config.getDataStreamMaxBufferSize();
+ while (curLen > 0) {
+ int writeLen = Math.min(curLen, maxBufferLen);
+ final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
+ off += writeLen;
+ bufferList.add(buf);
+ writeChunkToContainer(buf.duplicate());
+ curLen -= writeLen;
+ writtenDataLength += writeLen;
+ doFlushIfNeeded();
+ }
+ }
- final StreamBuffer buf = new StreamBuffer(b, off, len);
- bufferList.add(buf);
-
- writeChunkToContainer(buf.duplicate());
-
- writtenDataLength += len;
+ private void doFlushIfNeeded() throws IOException {
+ Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
+ .getDataStreamMaxBufferSize());
+ long boundary = config.getDataStreamBufferFlushSize() / config
+ .getDataStreamMaxBufferSize();
+ if (bufferList.size() % boundary == 0) {
+ updateFlushLength();
+ executePutBlock(false, false);
+ }
}
private void updateFlushLength() {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 923e0d0..73ee047 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -323,6 +323,8 @@ public interface MiniOzoneCluster {
protected Optional<Integer> chunkSize = Optional.empty();
protected OptionalInt streamBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferFlushSize = Optional.empty();
+ protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
+ protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty();
protected Optional<Long> blockSize = Optional.empty();
protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
@@ -557,6 +559,16 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setDataStreamBufferMaxSize(int size) {
+ dataStreamMaxBufferSize = OptionalInt.of(size);
+ return this;
+ }
+
+ public Builder setDataStreamBufferFlushize(long size) {
+ dataStreamBufferFlushSize = Optional.of(size);
+ return this;
+ }
+
/**
* Sets the block size for stream buffer.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 7959e0e..9778c02 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -654,6 +654,12 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
if (!streamBufferMaxSize.isPresent()) {
streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get());
}
+ if (!dataStreamBufferFlushSize.isPresent()) {
+ dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get());
+ }
+ if (!dataStreamMaxBufferSize.isPresent()) {
+ dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
+ }
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
@@ -670,6 +676,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get())));
clientConfig.setStreamBufferFlushSize(Math.round(
streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get())));
+ clientConfig.setDataStreamBufferFlushSize(Math.round(
+ streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get())));
+ clientConfig.setDataStreamMaxBufferSize((int) Math.round(
+ streamBufferSizeUnit.get()
+ .toBytes(dataStreamMaxBufferSize.getAsInt())));
conf.setFromObject(clientConfig);
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 05a1019..5eb38a0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -101,6 +104,8 @@ public class TestBlockDataStreamOutput {
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
+ .setDataStreamBufferFlushize(maxFlushSize)
+ .setDataStreamBufferMaxSize(chunkSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
.build();
cluster.waitForClusterToBeReady();
@@ -186,6 +191,35 @@ public class TestBlockDataStreamOutput {
validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
}
+ @Test
+ public void testPutBlockAtBoundary() throws Exception {
+ int dataLength = 500;
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ String keyName = getKeyName();
+ OzoneDataStreamOutput key = createKey(
+ keyName, ReplicationType.RATIS, 0);
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(ByteBuffer.wrap(data));
+ Assert.assertTrue(
+ metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
+ <= pendingPutBlockCount + 1);
+ key.close();
+ // Since data length is 500 , first putBlock will be at 400(flush boundary)
+ // and the other at 500
+ Assert.assertTrue(
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
+ == putBlockCount + 2);
+ validateData(keyName, data);
+ }
+
+
private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
long size) throws Exception {
return TestHelper.createStreamKey(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org