You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:24 UTC

[ozone] 19/35: HDDS-6039. Define a minimum packet size during streaming writes. (#2883)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit ac9815d47c89a5e148e46487b9b821ef3f9ed132
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Tue Dec 21 12:22:30 2021 +0530

    HDDS-6039. Define a minimum packet size during streaming writes. (#2883)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  | 16 ++++++
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 57 ++++++++++++++++------
 .../hadoop/hdds/scm/storage/StreamBuffer.java      | 15 +++++-
 .../hdds/scm/storage/StreamCommitWatcher.java      |  2 +-
 .../client/io/BlockDataStreamOutputEntryPool.java  |  2 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |  6 +++
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  6 +++
 .../client/rpc/TestBlockDataStreamOutput.java      | 32 ++++++++++--
 .../rpc/TestContainerStateMachineStream.java       |  1 +
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  1 +
 10 files changed, 118 insertions(+), 20 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 4fad9df250..5249e53644 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -81,6 +81,14 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private long dataStreamBufferFlushSize = 16 * 1024 * 1024;
 
+  @Config(key = "datastream.min.packet.size",
+      defaultValue = "1MB",
+      type = ConfigType.SIZE,
+      description = "The maximum size of the ByteBuffer "
+          + "(used via ratis streaming)",
+      tags = ConfigTag.CLIENT)
+  private int dataStreamMinPacketSize = 1024 * 1024;
+
   @Config(key = "stream.buffer.increment",
       defaultValue = "0B",
       type = ConfigType.SIZE,
@@ -259,6 +267,14 @@ public class OzoneClientConfig {
     this.streamBufferMaxSize = streamBufferMaxSize;
   }
 
+  public int getDataStreamMinPacketSize() {
+    return dataStreamMinPacketSize;
+  }
+
+  public void setDataStreamMinPacketSize(int dataStreamMinPacketSize) {
+    this.dataStreamMinPacketSize = dataStreamMinPacketSize;
+  }
+
   public int getMaxRetryCount() {
     return maxRetryCount;
   }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 6f5a54354a..9fb1340527 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -125,7 +127,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
   private final long syncSize = 0; // TODO: disk sync is disabled for now
   private long syncPosition = 0;
-
+  private StreamBuffer currentBuffer;
+  private XceiverClientMetrics metrics;
   /**
    * Creates a new BlockDataStreamOutput.
    *
@@ -172,6 +175,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     ioException = new AtomicReference<>(null);
     checksum = new Checksum(config.getChecksumType(),
         config.getBytesPerChecksum());
+    metrics = XceiverClientManager.getXceiverClientMetrics();
   }
 
   private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
@@ -257,27 +261,47 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     if (len == 0) {
       return;
     }
-    int curLen = len;
-    // set limit on the number of bytes that a ByteBuffer(StreamBuffer) can hold
-    int maxBufferLen = config.getDataStreamMaxBufferSize();
-    while (curLen > 0) {
-      int writeLen = Math.min(curLen, maxBufferLen);
+    while (len > 0) {
+      allocateNewBufferIfNeeded();
+      int writeLen = Math.min(len, currentBuffer.length());
       final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
+      currentBuffer.put(buf);
+      writeChunkIfNeeded();
       off += writeLen;
-      bufferList.add(buf);
-      writeChunkToContainer(buf.duplicate());
-      curLen -= writeLen;
       writtenDataLength += writeLen;
+      len -= writeLen;
       doFlushIfNeeded();
     }
   }
 
+  private void writeChunkIfNeeded() throws IOException {
+    if (currentBuffer.length()==0) {
+      writeChunk(currentBuffer);
+      currentBuffer = null;
+    }
+  }
+
+  private void writeChunk(StreamBuffer sb) throws IOException {
+    bufferList.add(sb);
+    ByteBuffer dup = sb.duplicate();
+    dup.position(0);
+    dup.limit(sb.position());
+    writeChunkToContainer(dup);
+  }
+
+  private void allocateNewBufferIfNeeded() {
+    if (currentBuffer==null) {
+      currentBuffer =
+          StreamBuffer.allocate(config.getDataStreamMinPacketSize());
+    }
+  }
+
   private void doFlushIfNeeded() throws IOException {
     Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
         .getDataStreamMaxBufferSize());
     long boundary = config.getDataStreamBufferFlushSize() / config
         .getDataStreamMaxBufferSize();
-    if (bufferList.size() % boundary == 0) {
+    if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) {
       updateFlushLength();
       executePutBlock(false, false);
     }
@@ -308,11 +332,10 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     int count = 0;
     while (len > 0) {
       final StreamBuffer buf = bufferList.get(count);
-      final long writeLen = Math.min(buf.length(), len);
+      final long writeLen = Math.min(buf.position(), len);
       final ByteBuffer duplicated = buf.duplicate();
-      if (writeLen != buf.length()) {
-        duplicated.limit(Math.toIntExact(len));
-      }
+      duplicated.position(0);
+      duplicated.limit(buf.position());
       writeChunkToContainer(duplicated);
       len -= writeLen;
       count++;
@@ -449,6 +472,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
       // This can be a partially filled chunk. Since we are flushing the buffer
       // here, we just limit this buffer to the current position. So that next
       // write will happen in new buffer
+
+      if (currentBuffer!=null) {
+        writeChunk(currentBuffer);
+        currentBuffer = null;
+      }
       updateFlushLength();
       executePutBlock(close, false);
     } else if (close) {
@@ -584,6 +612,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
         .setLen(effectiveChunkSize)
         .setChecksumData(checksumData.getProtoBufMessage())
         .build();
+    metrics.incrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Writing chunk {} length {} at offset {}",
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
index f36019e2ae..5118ea5ead 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
@@ -27,7 +27,7 @@ public class StreamBuffer {
   private final ByteBuffer buffer;
 
   public StreamBuffer(ByteBuffer buffer) {
-    this.buffer = buffer.asReadOnlyBuffer();
+    this.buffer = buffer;
   }
 
   public StreamBuffer(ByteBuffer buffer, int offset, int length) {
@@ -43,4 +43,17 @@ public class StreamBuffer {
     return buffer.limit() - buffer.position();
   }
 
+  public int position() {
+    return buffer.position();
+  }
+
+
+  public void put(StreamBuffer sb){
+    buffer.put(sb.buffer);
+  }
+
+  public static StreamBuffer allocate(int size){
+    return new StreamBuffer(ByteBuffer.allocate(size));
+  }
+
 }
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index 3a59d07571..9ae604e951 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -178,7 +178,7 @@ public class StreamCommitWatcher {
       Preconditions.checkState(commitIndexMap.containsKey(index));
       final List<StreamBuffer> buffers = commitIndexMap.remove(index);
       final long length =
-          buffers.stream().mapToLong(StreamBuffer::length).sum();
+          buffers.stream().mapToLong(StreamBuffer::position).sum();
       totalAckDataLength += length;
       // clear the future object from the future Map
       final CompletableFuture<ContainerCommandResponseProto> remove =
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index e49b0b79ad..24a046f623 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -309,7 +309,7 @@ public class BlockDataStreamOutputEntryPool {
   long computeBufferData() {
     long totalDataLen =0;
     for (StreamBuffer b : bufferList){
-      totalDataLen += b.length();
+      totalDataLen += b.position();
     }
     return totalDataLen;
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 501f9ce0fb..5c0e74f80b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -326,6 +326,7 @@ public interface MiniOzoneCluster {
     protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
     protected OptionalInt dataStreamMaxBufferSize  = OptionalInt.empty();
     protected Optional<Long> streamBufferMaxSize = Optional.empty();
+    protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty();
     protected Optional<Long> blockSize = Optional.empty();
     protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
     protected boolean includeRecon = false;
@@ -569,6 +570,11 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    public Builder setDataStreamMinPacketSize(int size) {
+      dataStreamMinPacketSize = OptionalInt.of(size);
+      return this;
+    }
+
     /**
      * Sets the block size for stream buffer.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 8f75659ce9..ce619aac59 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -667,6 +667,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       if (!dataStreamMaxBufferSize.isPresent()) {
         dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
       }
+      if (!dataStreamMinPacketSize.isPresent()) {
+        dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4);
+      }
       if (!blockSize.isPresent()) {
         blockSize = Optional.of(2 * streamBufferMaxSize.get());
       }
@@ -688,6 +691,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       clientConfig.setDataStreamMaxBufferSize((int) Math.round(
           streamBufferSizeUnit.get()
               .toBytes(dataStreamMaxBufferSize.getAsInt())));
+      clientConfig.setDataStreamMinPacketSize((int) Math.round(
+          streamBufferSizeUnit.get()
+              .toBytes(dataStreamMinPacketSize.getAsInt())));
       conf.setFromObject(clientConfig);
 
       conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 5eb38a00de..c9242df8b1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -107,6 +107,7 @@ public class TestBlockDataStreamOutput {
         .setDataStreamBufferFlushize(maxFlushSize)
         .setDataStreamBufferMaxSize(chunkSize)
         .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .setDataStreamMinPacketSize(2*chunkSize/5)
         .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
@@ -193,7 +194,7 @@ public class TestBlockDataStreamOutput {
 
   @Test
   public void testPutBlockAtBoundary() throws Exception {
-    int dataLength = 500;
+    int dataLength = 200;
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
     long putBlockCount = metrics.getContainerOpCountMetrics(
@@ -211,8 +212,8 @@ public class TestBlockDataStreamOutput {
         metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
             <= pendingPutBlockCount + 1);
     key.close();
-    // Since data length is 500 , first putBlock will be at 400(flush boundary)
-    // and the other at 500
+    // Since data length is 200 , first putBlock will be at 160(flush boundary)
+    // and the other at 200
     Assert.assertTrue(
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
             == putBlockCount + 2);
@@ -230,4 +231,29 @@ public class TestBlockDataStreamOutput {
         .validateData(keyName, data, objectStore, volumeName, bucketName);
   }
 
+
+  @Test
+  public void testMinPacketSize() throws Exception {
+    String keyName = getKeyName();
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5)
+            .getBytes(UTF_8);
+    key.write(ByteBuffer.wrap(data));
+    // minPacketSize= 40, so first write of 20 wont trigger a writeChunk
+    Assert.assertEquals(writeChunkCount,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    key.write(ByteBuffer.wrap(data));
+    Assert.assertEquals(writeChunkCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    // now close the stream, It will update the key length.
+    key.close();
+    String dataString = new String(data, UTF_8);
+    validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+  }
+
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
index 3b17450376..f4c756bccd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineStream.java
@@ -119,6 +119,7 @@ public class TestContainerStateMachineStream {
     conf.setQuietMode(false);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
+            .setDataStreamMinPacketSize(1024)
             .build();
     cluster.waitForClusterToBeReady();
     cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 60000);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index fb50b81f18..574fed7a5b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -200,6 +200,7 @@ public abstract class TestOzoneRpcClientAbstract {
         .setTotalPipelineNumLimit(10)
         .setScmId(scmId)
         .setClusterId(clusterId)
+        .setDataStreamMinPacketSize(1024)
         .build();
     cluster.waitForClusterToBeReady();
     ozClient = OzoneClientFactory.getRpcClient(conf);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org