You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/11/07 18:12:12 UTC
[ozone] 19/40: HDDS-6039. Define a minimum packet size during streaming writes. (#2883)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 28fdecd8563f989e0ee45f273bcbde1d87742eb8
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Tue Dec 21 12:22:30 2021 +0530
HDDS-6039. Define a minimum packet size during streaming writes. (#2883)
(cherry picked from commit 1f56a45e7578fdd495e852f0e449e62b1079c640)
(cherry picked from commit 750c5f475a4a03cdb078699d5f5a2254f1f491b0)
(cherry picked from commit 7e61b1f64216639b4cac22125f047fe1c56b5a47)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 16 ++++++
.../hdds/scm/storage/BlockDataStreamOutput.java | 57 ++++++++++++++++------
.../hadoop/hdds/scm/storage/StreamBuffer.java | 15 +++++-
.../hdds/scm/storage/StreamCommitWatcher.java | 2 +-
.../client/io/BlockDataStreamOutputEntryPool.java | 2 +-
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 6 +++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 6 +++
.../client/rpc/TestBlockDataStreamOutput.java | 32 ++++++++++--
.../rpc/TestContainerStateMachineStream.java | 1 +
.../client/rpc/TestOzoneRpcClientAbstract.java | 1 +
10 files changed, 118 insertions(+), 20 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 715a19212b..86a725220a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -81,6 +81,14 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private long dataStreamBufferFlushSize = 16 * 1024 * 1024;
+ @Config(key = "datastream.min.packet.size",
+ defaultValue = "1MB",
+ type = ConfigType.SIZE,
+ description = "The maximum size of the ByteBuffer "
+ + "(used via ratis streaming)",
+ tags = ConfigTag.CLIENT)
+ private int dataStreamMinPacketSize = 1024 * 1024;
+
@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
@@ -267,6 +275,14 @@ public class OzoneClientConfig {
this.streamBufferMaxSize = streamBufferMaxSize;
}
+ public int getDataStreamMinPacketSize() {
+ return dataStreamMinPacketSize;
+ }
+
+ public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) {
+ this.dataStreamMinPacketSize = dataStreamMinPacketSize;
+ }
+
public int getMaxRetryCount() {
return maxRetryCount;
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 6f5a54354a..9fb1340527 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -125,7 +127,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
private final long syncSize = 0; // TODO: disk sync is disabled for now
private long syncPosition = 0;
-
+ private StreamBuffer currentBuffer;
+ private XceiverClientMetrics metrics;
/**
* Creates a new BlockDataStreamOutput.
*
@@ -172,6 +175,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
ioException = new AtomicReference<>(null);
checksum = new Checksum(config.getChecksumType(),
config.getBytesPerChecksum());
+ metrics = XceiverClientManager.getXceiverClientMetrics();
}
private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
@@ -257,27 +261,47 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
if (len == 0) {
return;
}
- int curLen = len;
- // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold
- int maxBufferLen = config.getDataStreamMaxBufferSize();
- while (curLen > 0) {
- int writeLen = Math.min(curLen, maxBufferLen);
+ while (len > 0) {
+ allocateNewBufferIfNeeded();
+ int writeLen = Math.min(len, currentBuffer.length());
final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
+ currentBuffer.put(buf);
+ writeChunkIfNeeded();
off += writeLen;
- bufferList.add(buf);
- writeChunkToContainer(buf.duplicate());
- curLen -= writeLen;
writtenDataLength += writeLen;
+ len -= writeLen;
doFlushIfNeeded();
}
}
+ private void writeChunkIfNeeded() throws IOException {
+ if (currentBuffer.length()==0) {
+ writeChunk(currentBuffer);
+ currentBuffer = null;
+ }
+ }
+
+ private void writeChunk(StreamBuffer sb) throws IOException {
+ bufferList.add(sb);
+ ByteBuffer dup = sb.duplicate();
+ dup.position(0);
+ dup.limit(sb.position());
+ writeChunkToContainer(dup);
+ }
+
+ private void allocateNewBufferIfNeeded() {
+ if (currentBuffer==null) {
+ currentBuffer =
+ StreamBuffer.allocate(config.getDataStreamMinPacketSize());
+ }
+ }
+
private void doFlushIfNeeded() throws IOException {
Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
.getDataStreamMaxBufferSize());
long boundary = config.getDataStreamBufferFlushSize() / config
.getDataStreamMaxBufferSize();
- if (bufferList.size() % boundary == 0) {
+ if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) {
updateFlushLength();
executePutBlock(false, false);
}
@@ -308,11 +332,10 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
int count = 0;
while (len > 0) {
final StreamBuffer buf = bufferList.get(count);
- final long writeLen = Math.min(buf.length(), len);
+ final long writeLen = Math.min(buf.position(), len);
final ByteBuffer duplicated = buf.duplicate();
- if (writeLen != buf.length()) {
- duplicated.limit(Math.toIntExact(len));
- }
+ duplicated.position(0);
+ duplicated.limit(buf.position());
writeChunkToContainer(duplicated);
len -= writeLen;
count++;
@@ -449,6 +472,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
+
+ if (currentBuffer!=null) {
+ writeChunk(currentBuffer);
+ currentBuffer = null;
+ }
updateFlushLength();
executePutBlock(close, false);
} else if (close) {
@@ -584,6 +612,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
.setLen(effectiveChunkSize)
.setChecksumData(checksumData.getProtoBufMessage())
.build();
+ metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
if (LOG.isDebugEnabled()) {
LOG.debug("Writing chunk {} length {} at offset {}",
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
index f36019e2ae..5118ea5ead 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
@@ -27,7 +27,7 @@ public class StreamBuffer {
private final ByteBuffer buffer;
public StreamBuffer(ByteBuffer buffer) {
- this.buffer = buffer.asReadOnlyBuffer();
+ this.buffer = buffer;
}
public StreamBuffer(ByteBuffer buffer, int offset, int length) {
@@ -43,4 +43,17 @@ public class StreamBuffer {
return buffer.limit() - buffer.position();
}
+ public int position() {
+ return buffer.position();
+ }
+
+
+ public void put(StreamBuffer sb){
+ buffer.put(sb.buffer);
+ }
+
+ public static StreamBuffer allocate(int size){
+ return new StreamBuffer(ByteBuffer.allocate(size));
+ }
+
}
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index 3a59d07571..9ae604e951 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -178,7 +178,7 @@ public class StreamCommitWatcher {
Preconditions.checkState(commitIndexMap.containsKey(index));
final List<StreamBuffer> buffers = commitIndexMap.remove(index);
final long length =
- buffers.stream().mapToLong(StreamBuffer::length).sum();
+ buffers.stream().mapToLong(StreamBuffer::position).sum();
totalAckDataLength += length;
// clear the future object from the future Map
final CompletableFuture<ContainerCommandResponseProto> remove =
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index e49b0b79ad..24a046f623 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -309,7 +309,7 @@ public class BlockDataStreamOutputEntryPool {
long computeBufferData() {
long totalDataLen =0;
for (StreamBuffer b : bufferList){
- totalDataLen += b.length();
+ totalDataLen += b.position();
}
return totalDataLen;
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index a97ea318e2..a1783a6cb5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -328,6 +328,7 @@ public interface MiniOzoneCluster {
protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty();
+ protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty();
protected Optional<Long> blockSize = Optional.empty();
protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
protected boolean includeRecon = false;
@@ -578,6 +579,11 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setDataStreamMinPacketSize(int size) {
+ dataStreamMinPacketSize = OptionalInt.of(size);
+ return this;
+ }
+
/**
* Sets the block size for stream buffer.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 2c26f9725f..872db06aa5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -660,6 +660,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
if (!dataStreamMaxBufferSize.isPresent()) {
dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
}
+ if (!dataStreamMinPacketSize.isPresent()) {
+ dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4);
+ }
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
@@ -681,6 +684,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
clientConfig.setDataStreamMaxBufferSize((int) Math.round(
streamBufferSizeUnit.get()
.toBytes(dataStreamMaxBufferSize.getAsInt())));
+ clientConfig.setDataStreamMinPacketSize((int) Math.round(
+ streamBufferSizeUnit.get()
+ .toBytes(dataStreamMinPacketSize.getAsInt())));
conf.setFromObject(clientConfig);
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 5eb38a00de..c9242df8b1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -107,6 +107,7 @@ public class TestBlockDataStreamOutput {
.setDataStreamBufferFlushize(maxFlushSize)
.setDataStreamBufferMaxSize(chunkSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .setDataStreamMinPacketSize(2*chunkSize/5)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
@@ -193,7 +194,7 @@ public class TestBlockDataStreamOutput {
@Test
public void testPutBlockAtBoundary() throws Exception {
- int dataLength = 500;
+ int dataLength = 200;
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long putBlockCount = metrics.getContainerOpCountMetrics(
@@ -211,8 +212,8 @@ public class TestBlockDataStreamOutput {
metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
key.close();
- // Since data length is 500 , first putBlock will be at 400(flush boundary)
- // and the other at 500
+ // Since data length is 200 , first putBlock will be at 160(flush boundary)
+ // and the other at 200
Assert.assertTrue(
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
== putBlockCount + 2);
@@ -230,4 +231,29 @@ public class TestBlockDataStreamOutput {
.validateData(keyName, data, objectStore, volumeName, bucketName);
}
+
+ @Test
+ public void testMinPacketSize() throws Exception {
+ String keyName = getKeyName();
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
+ long writeChunkCount =
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5)
+ .getBytes(UTF_8);
+ key.write(ByteBuffer.wrap(data));
+ // minPacketSize= 40, so first write of 20 wont trigger a writeChunk
+ Assert.assertEquals(writeChunkCount,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ key.write(ByteBuffer.wrap(data));
+ Assert.assertEquals(writeChunkCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ // now close the stream, It will update the key length.
+ key.close();
+ String dataString = new String(data, UTF_8);
+ validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+ }
+
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
index 3b17450376..f4c756bccd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
@@ -119,6 +119,7 @@ public class TestContainerStateMachineStream {
conf.setQuietMode(false);
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+ .setDataStreamMinPacketSize(1024)
.build();
cluster.waitForClusterToBeReady();
cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index a6566141d7..13a1f5a76b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -203,6 +203,7 @@ public abstract class TestOzoneRpcClientAbstract {
.setTotalPipelineNumLimit(10)
.setScmId(scmId)
.setClusterId(clusterId)
+ .setDataStreamMinPacketSize(1024)
.build();
cluster.waitForClusterToBeReady();
ozClient = OzoneClientFactory.getRpcClient(conf);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org