You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/24 16:27:36 UTC

[ozone] 18/36: HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed boundary for streaming writes. (#2866)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 2ef3b47fcffd95cd18f932ac380156fa1f1cd1ab
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Wed Dec 1 11:00:46 2021 +0530

    HDDS-5851. [Ozone-Streaming] Define a PutBlock/maxBuffer fixed boundary for streaming writes. (#2866)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  | 31 ++++++++++++++++++++
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 29 ++++++++++++++----
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  | 12 ++++++++
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  | 11 +++++++
 .../client/rpc/TestBlockDataStreamOutput.java      | 34 ++++++++++++++++++++++
 5 files changed, 111 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 63dd511596..4fad9df250 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -66,6 +66,21 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private int streamBufferSize = 4 * 1024 * 1024;
 
+  @Config(key = "datastream.max.buffer.size",
+      defaultValue = "4MB",
+      type = ConfigType.SIZE,
+      description = "The maximum size of the ByteBuffer "
+          + "(used via ratis streaming)",
+      tags = ConfigTag.CLIENT)
+  private int dataStreamMaxBufferSize = 4 * 1024 * 1024;
+
+  @Config(key = "datastream.buffer.flush.size",
+      defaultValue = "16MB",
+      type = ConfigType.SIZE,
+      description = "The boundary at which putBlock is executed",
+      tags = ConfigTag.CLIENT)
+  private long dataStreamBufferFlushSize = 16 * 1024 * 1024;
+
   @Config(key = "stream.buffer.increment",
       defaultValue = "0B",
       type = ConfigType.SIZE,
@@ -220,6 +235,14 @@ public class OzoneClientConfig {
     this.streamBufferSize = streamBufferSize;
   }
 
+  public int getDataStreamMaxBufferSize() {
+    return dataStreamMaxBufferSize;
+  }
+
+  public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) {
+    this.dataStreamMaxBufferSize = dataStreamMaxBufferSize;
+  }
+
   public boolean isStreamBufferFlushDelay() {
     return streamBufferFlushDelay;
   }
@@ -288,6 +311,14 @@ public class OzoneClientConfig {
     return bufferIncrement;
   }
 
+  public long getDataStreamBufferFlushSize() {
+    return dataStreamBufferFlushSize;
+  }
+
+  public void setDataStreamBufferFlushSize(long dataStreamBufferFlushSize) {
+    this.dataStreamBufferFlushSize = dataStreamBufferFlushSize;
+  }
+
   public ChecksumCombineMode getChecksumCombineMode() {
     try {
       return ChecksumCombineMode.valueOf(checksumCombineMode);
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index aada48e2f5..6f5a54354a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -257,13 +257,30 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     if (len == 0) {
       return;
     }
+    int curLen = len;
+    // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold
+    int maxBufferLen = config.getDataStreamMaxBufferSize();
+    while (curLen > 0) {
+      int writeLen = Math.min(curLen, maxBufferLen);
+      final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
+      off += writeLen;
+      bufferList.add(buf);
+      writeChunkToContainer(buf.duplicate());
+      curLen -= writeLen;
+      writtenDataLength += writeLen;
+      doFlushIfNeeded();
+    }
+  }
 
-    final StreamBuffer buf = new StreamBuffer(b, off, len);
-    bufferList.add(buf);
-
-    writeChunkToContainer(buf.duplicate());
-
-    writtenDataLength += len;
+  private void doFlushIfNeeded() throws IOException {
+    Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
+        .getDataStreamMaxBufferSize());
+    long boundary = config.getDataStreamBufferFlushSize() / config
+        .getDataStreamMaxBufferSize();
+    if (bufferList.size() % boundary == 0) {
+      updateFlushLength();
+      executePutBlock(false, false);
+    }
   }
 
   private void updateFlushLength() {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index b1d9a74209..501f9ce0fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -323,6 +323,8 @@ public interface MiniOzoneCluster {
     protected Optional<Integer> chunkSize = Optional.empty();
     protected OptionalInt streamBufferSize = OptionalInt.empty();
     protected Optional<Long> streamBufferFlushSize = Optional.empty();
+    protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
+    protected OptionalInt dataStreamMaxBufferSize  = OptionalInt.empty();
     protected Optional<Long> streamBufferMaxSize = Optional.empty();
     protected Optional<Long> blockSize = Optional.empty();
     protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
@@ -557,6 +559,16 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    public Builder setDataStreamBufferMaxSize(int size) {
+      dataStreamMaxBufferSize = OptionalInt.of(size);
+      return this;
+    }
+
+    public Builder setDataStreamBufferFlushize(long size) {
+      dataStreamBufferFlushSize = Optional.of(size);
+      return this;
+    }
+
     /**
      * Sets the block size for stream buffer.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index a549767481..8f75659ce9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -661,6 +661,12 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       if (!streamBufferMaxSize.isPresent()) {
         streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get());
       }
+      if (!dataStreamBufferFlushSize.isPresent()) {
+        dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get());
+      }
+      if (!dataStreamMaxBufferSize.isPresent()) {
+        dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
+      }
       if (!blockSize.isPresent()) {
         blockSize = Optional.of(2 * streamBufferMaxSize.get());
       }
@@ -677,6 +683,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
           streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get())));
       clientConfig.setStreamBufferFlushSize(Math.round(
           streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get())));
+      clientConfig.setDataStreamBufferFlushSize(Math.round(
+          streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get())));
+      clientConfig.setDataStreamMaxBufferSize((int) Math.round(
+          streamBufferSizeUnit.get()
+              .toBytes(dataStreamMaxBufferSize.getAsInt())));
       conf.setFromObject(clientConfig);
 
       conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 05a101951b..5eb38a00de 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.ozone.client.rpc;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
 import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -101,6 +104,8 @@ public class TestBlockDataStreamOutput {
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
+        .setDataStreamBufferFlushize(maxFlushSize)
+        .setDataStreamBufferMaxSize(chunkSize)
         .setStreamBufferSizeUnit(StorageUnit.BYTES)
         .build();
     cluster.waitForClusterToBeReady();
@@ -186,6 +191,35 @@ public class TestBlockDataStreamOutput {
     validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
   }
 
+  @Test
+  public void testPutBlockAtBoundary() throws Exception {
+    int dataLength = 500;
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    String keyName = getKeyName();
+    OzoneDataStreamOutput key = createKey(
+        keyName, ReplicationType.RATIS, 0);
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(ByteBuffer.wrap(data));
+    Assert.assertTrue(
+        metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    key.close();
+    // Since data length is 500 , first putBlock will be at 400(flush boundary)
+    // and the other at 500
+    Assert.assertTrue(
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
+            == putBlockCount + 2);
+    validateData(keyName, data);
+  }
+
+
   private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
       long size) throws Exception {
     return TestHelper.createStreamKey(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org