You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/04/22 06:59:13 UTC

[hadoop-ozone] branch master updated: HDDS-3155. Improved ozone client flush implementation to make it faster. (#716)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2098516  HDDS-3155. Improved ozone client flush implementation to make it faster. (#716)
2098516 is described below

commit 2098516928378418b11f65cf404e3ecebbf46910
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Apr 22 14:59:02 2020 +0800

    HDDS-3155. Improved ozone client flush implementation to make it faster. (#716)
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  15 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  11 ++
 .../common/src/main/resources/ozone-default.xml    |   9 ++
 .../ozone/client/io/BlockOutputStreamEntry.java    |  36 ++++-
 .../client/io/BlockOutputStreamEntryPool.java      |   8 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  16 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   5 +
 .../ozone/client/rpc/Test2BlockOutputStream.java   | 173 +++++++++++++++++++++
 8 files changed, 258 insertions(+), 15 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index ffabbf3..f15a5e6 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -85,7 +85,9 @@ public class BlockOutputStream extends OutputStream {
   private final int bytesPerChecksum;
   private int chunkIndex;
   private final AtomicLong chunkOffset = new AtomicLong();
+  private final int streamBufferSize;
   private final long streamBufferFlushSize;
+  private final boolean streamBufferFlushDelay;
   private final long streamBufferMaxSize;
   private final BufferPool bufferPool;
   // The IOException will be set by response handling thread in case there is an
@@ -131,10 +133,10 @@ public class BlockOutputStream extends OutputStream {
   @SuppressWarnings("parameternumber")
   public BlockOutputStream(BlockID blockID,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
-      long streamBufferFlushSize, long streamBufferMaxSize,
+      int streamBufferSize, long streamBufferFlushSize,
+      boolean streamBufferFlushDelay, long streamBufferMaxSize,
       BufferPool bufferPool, ChecksumType checksumType,
-      int bytesPerChecksum)
-      throws IOException {
+      int bytesPerChecksum) throws IOException {
     this.blockID = new AtomicReference<>(blockID);
     KeyValue keyValue =
         KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@@ -143,8 +145,10 @@ public class BlockOutputStream extends OutputStream {
             .addMetadata(keyValue);
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
+    this.streamBufferSize = streamBufferSize;
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
+    this.streamBufferFlushDelay = streamBufferFlushDelay;
     this.bufferPool = bufferPool;
     this.bytesPerChecksum = bytesPerChecksum;
 
@@ -434,7 +438,9 @@ public class BlockOutputStream extends OutputStream {
   @Override
   public void flush() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
-        && bufferPool != null && bufferPool.getSize() > 0) {
+        && bufferPool != null && bufferPool.getSize() > 0
+        && (!streamBufferFlushDelay ||
+            writtenDataLength - totalDataFlushedLength >= streamBufferSize)) {
       try {
         handleFlush(false);
       } catch (InterruptedException | ExecutionException e) {
@@ -447,7 +453,6 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
-
   private void writeChunk(ChunkBuffer buffer)
       throws IOException {
     // This data in the buffer will be pushed to datanode and a reference will
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 26fa6e2..fe1c440 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -149,6 +149,17 @@ public final class OzoneConfigKeys {
   public static final TimeDuration OZONE_CLIENT_RETRY_INTERVAL_DEFAULT =
       TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
 
+  /**
+   * If this value is true, when the client calls the flush() method,
+   * it checks whether the data in the buffer is greater than
+   * OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT. If greater than,
+   * send the data in the buffer to the datanode.
+   * */
+  public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY =
+      "ozone.client.stream.buffer.flush.delay";
+  public static final boolean OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
+      false;
+
   // This defines the overall connection limit for the connection pool used in
   // RestClient.
   public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 04a61cb..96a1128 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -387,6 +387,15 @@
     </description>
   </property>
   <property>
+    <name>ozone.client.stream.buffer.flush.delay</name>
+    <value>false</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>If set true, when call flush() and determine whether the
+      data in the current buffer is greater than ozone.client.stream.buffer.size.
+      if greater than then send buffer to the datanode.
+    </description>
+  </property>
+  <property>
     <name>ozone.client.stream.buffer.size</name>
     <value>4MB</value>
     <tag>OZONE, CLIENT</tag>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 40a4a6b..8e1e640 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -55,7 +55,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
   private long currentPosition;
   private Token<OzoneBlockTokenIdentifier> token;
 
+  private final int streamBufferSize;
   private final long streamBufferFlushSize;
+  private final boolean streamBufferFlushDelay;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private BufferPool bufferPool;
@@ -64,7 +66,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
   private BlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager,
       Pipeline pipeline, String requestId, int chunkSize,
-      long length, long streamBufferFlushSize, long streamBufferMaxSize,
+      long length, int streamBufferSize, long streamBufferFlushSize,
+      boolean streamBufferFlushDelay, long streamBufferMaxSize,
       long watchTimeout, BufferPool bufferPool,
       ChecksumType checksumType, int bytesPerChecksum,
       Token<OzoneBlockTokenIdentifier> token) {
@@ -77,7 +80,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
     this.token = token;
     this.length = length;
     this.currentPosition = 0;
+    this.streamBufferSize = streamBufferSize;
     this.streamBufferFlushSize = streamBufferFlushSize;
+    this.streamBufferFlushDelay = streamBufferFlushDelay;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
     this.bufferPool = bufferPool;
@@ -110,9 +115,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
       }
       this.outputStream =
           new BlockOutputStream(blockID, xceiverClientManager,
-              pipeline, streamBufferFlushSize,
-              streamBufferMaxSize, bufferPool, checksumType,
-              bytesPerChecksum);
+              pipeline, streamBufferSize, streamBufferFlushSize,
+              streamBufferFlushDelay, streamBufferMaxSize, bufferPool,
+              checksumType, bytesPerChecksum);
     }
   }
 
@@ -215,7 +220,9 @@ public final class BlockOutputStreamEntry extends OutputStream {
     private String requestId;
     private int chunkSize;
     private long length;
+    private int  streamBufferSize;
     private long streamBufferFlushSize;
+    private boolean streamBufferFlushDelay;
     private long streamBufferMaxSize;
     private long watchTimeout;
     private BufferPool bufferPool;
@@ -269,11 +276,21 @@ public final class BlockOutputStreamEntry extends OutputStream {
       return this;
     }
 
+    public Builder setStreamBufferSize(int bufferSize) {
+      this.streamBufferSize = bufferSize;
+      return this;
+    }
+
     public Builder setStreamBufferFlushSize(long bufferFlushSize) {
       this.streamBufferFlushSize = bufferFlushSize;
       return this;
     }
 
+    public Builder setStreamBufferFlushDelay(boolean bufferFlushDelay) {
+      this.streamBufferFlushDelay = bufferFlushDelay;
+      return this;
+    }
+
     public Builder setStreamBufferMaxSize(long bufferMaxSize) {
       this.streamBufferMaxSize = bufferMaxSize;
       return this;
@@ -297,7 +314,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
     public BlockOutputStreamEntry build() {
       return new BlockOutputStreamEntry(blockID, key,
           xceiverClientManager, pipeline, requestId, chunkSize,
-          length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
+          length, streamBufferSize, streamBufferFlushSize,
+          streamBufferFlushDelay, streamBufferMaxSize, watchTimeout,
           bufferPool, checksumType, bytesPerChecksum, token);
     }
   }
@@ -331,10 +349,18 @@ public final class BlockOutputStreamEntry extends OutputStream {
     return currentPosition;
   }
 
+  public int getStreamBufferSize() {
+    return streamBufferSize;
+  }
+
   public long getStreamBufferFlushSize() {
     return streamBufferFlushSize;
   }
 
+  public boolean getStreamBufferFlushDelay() {
+    return streamBufferFlushDelay;
+  }
+
   public long getStreamBufferMaxSize() {
     return streamBufferMaxSize;
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 17683ad..3cab664 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -61,6 +61,7 @@ public class BlockOutputStreamEntryPool {
   private final String requestID;
   private final int streamBufferSize;
   private final long streamBufferFlushSize;
+  private final boolean streamBufferFlushDelay;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long blockSize;
@@ -75,7 +76,8 @@ public class BlockOutputStreamEntryPool {
   public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
       int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
       HddsProtos.ReplicationType type,
-      int bufferSize, long bufferFlushSize, long bufferMaxSize,
+      int bufferSize, long bufferFlushSize,
+      boolean bufferFlushDelay, long bufferMaxSize,
       long size, long watchTimeout, ContainerProtos.ChecksumType checksumType,
       int bytesPerChecksum, String uploadID, int partNumber,
       boolean isMultipart, OmKeyInfo info,
@@ -93,6 +95,7 @@ public class BlockOutputStreamEntryPool {
     this.requestID = requestId;
     this.streamBufferSize = bufferSize;
     this.streamBufferFlushSize = bufferFlushSize;
+    this.streamBufferFlushDelay = bufferFlushDelay;
     this.streamBufferMaxSize = bufferMaxSize;
     this.blockSize = size;
     this.watchTimeout = watchTimeout;
@@ -137,6 +140,7 @@ public class BlockOutputStreamEntryPool {
     requestID = null;
     streamBufferSize = 0;
     streamBufferFlushSize = 0;
+    streamBufferFlushDelay = false;
     streamBufferMaxSize = 0;
     bufferPool = new BufferPool(chunkSize, 1);
     watchTimeout = 0;
@@ -188,7 +192,9 @@ public class BlockOutputStreamEntryPool {
             .setRequestId(requestID)
             .setChunkSize(chunkSize)
             .setLength(subKeyInfo.getLength())
+            .setStreamBufferSize(streamBufferSize)
             .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferFlushDelay(streamBufferFlushDelay)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setbufferPool(bufferPool)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index ba0ba12..843155c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -124,15 +124,16 @@ public class KeyOutputStream extends OutputStream {
       XceiverClientManager xceiverClientManager,
       OzoneManagerProtocol omClient, int chunkSize,
       String requestId, ReplicationFactor factor, ReplicationType type,
-      int bufferSize, long bufferFlushSize, long bufferMaxSize,
-      long size, long watchTimeout,
+      int bufferSize, long bufferFlushSize, boolean isBufferFlushDelay,
+      long bufferMaxSize, long size, long watchTimeout,
       ChecksumType checksumType, int bytesPerChecksum,
       String uploadID, int partNumber, boolean isMultipart,
       int maxRetryCount, long retryInterval) {
     OmKeyInfo info = handler.getKeyInfo();
     blockOutputStreamEntryPool =
         new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
-            type, bufferSize, bufferFlushSize, bufferMaxSize, size,
+            type, bufferSize, bufferFlushSize, isBufferFlushDelay,
+            bufferMaxSize, size,
             watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber,
             isMultipart, info, xceiverClientManager, handler.getId());
     // Retrieve the file encryption key info, null if file is not in
@@ -542,6 +543,7 @@ public class KeyOutputStream extends OutputStream {
     private ReplicationFactor factor;
     private int streamBufferSize;
     private long streamBufferFlushSize;
+    private boolean streamBufferFlushDelay;
     private long streamBufferMaxSize;
     private long blockSize;
     private long watchTimeout;
@@ -608,6 +610,11 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
+    public Builder setStreamBufferFlushDelay(boolean isDelay) {
+      this.streamBufferFlushDelay = isDelay;
+      return this;
+    }
+
     public Builder setStreamBufferMaxSize(long size) {
       this.streamBufferMaxSize = size;
       return this;
@@ -646,7 +653,8 @@ public class KeyOutputStream extends OutputStream {
     public KeyOutputStream build() {
       return new KeyOutputStream(openHandler, xceiverManager, omClient,
           chunkSize, requestID, factor, type,
-          streamBufferSize, streamBufferFlushSize, streamBufferMaxSize,
+          streamBufferSize, streamBufferFlushSize, streamBufferFlushDelay,
+          streamBufferMaxSize,
           blockSize, watchTimeout, checksumType,
           bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
           maxRetryCount, retryInterval);
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 1382117..532a7c5 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -136,6 +136,7 @@ public class RpcClient implements ClientProtocol {
   private final ACLType groupRights;
   private final int streamBufferSize;
   private final long streamBufferFlushSize;
+  private boolean streamBufferFlushDelay;
   private final long streamBufferMaxSize;
   private final long blockSize;
   private final ClientId clientId = ClientId.randomId();
@@ -196,6 +197,9 @@ public class RpcClient implements ClientProtocol {
         .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
             OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT,
             StorageUnit.BYTES);
+    streamBufferFlushDelay = conf.getBoolean(
+        OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY,
+        OzoneConfigKeys.OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
     streamBufferMaxSize = (long) conf
         .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
             OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT,
@@ -1198,6 +1202,7 @@ public class RpcClient implements ClientProtocol {
             .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
             .setStreamBufferSize(streamBufferSize)
             .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferFlushDelay(streamBufferFlushDelay)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setBlockSize(blockSize)
             .setChecksumType(checksumType)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
new file mode 100644
index 0000000..3bb55fa
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
+
+/**
+ * Tests BlockOutputStream class.
+ */
+public class Test2BlockOutputStream {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf = new OzoneConfiguration();
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int flushSize;
+  private static int maxFlushSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+        StorageUnit.MB);
+    conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, true);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "testblockoutputstream";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFlushChunkDelay() throws Exception {
+    String keyName1 = getKeyName();
+    OzoneOutputStream key1 = createKey(keyName1, ReplicationType.RATIS, 0);
+
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, 10)
+            .getBytes(UTF_8);
+    key1.write(data1);
+    key1.flush();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key1.getOutputStream();
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data(length 10) less than chunk Size,
+    // at this time we call flush will not  sync data.
+    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+    key1.close();
+    validateData(keyName1, data1);
+
+    String keyName2 = getKeyName();
+    OzoneOutputStream key2 = createKey(keyName2, ReplicationType.RATIS, 0);
+    byte[] data2 =
+            ContainerTestHelper.getFixedLengthString(keyString, 110)
+                    .getBytes(UTF_8);
+    key2.write(data2);
+    key2.flush();
+    keyOutputStream = (KeyOutputStream)key2.getOutputStream();
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    stream = keyOutputStream.getStreamEntries().get(0)
+            .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data(length 110) greater than chunk Size,
+    // at this time we call flush will sync data.
+    Assert.assertEquals(data2.length,
+            blockOutputStream.getTotalDataFlushedLength());
+    key2.close();
+    validateData(keyName2, data2);
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return TestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+  private void validateData(String keyName, byte[] data) throws Exception {
+    TestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org