You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/03/15 06:56:51 UTC
[ozone] 26/30: HDDS-5487. [Ozone-Streaming] BlockDataStreamOutput support FlushDelay. (#3002)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 9134b73135495c994126f710000d60b761098a00
Author: micah zhao <mi...@tencent.com>
AuthorDate: Mon Feb 14 20:58:11 2022 +0800
HDDS-5487. [Ozone-Streaming] BlockDataStreamOutput support FlushDelay. (#3002)
---
.../apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 10 +++++++++-
.../hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java | 1 -
2 files changed, 9 insertions(+), 2 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index a3fe1c2..9ac4330 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -429,7 +429,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
} else {
byteBufferList = null;
}
- flush();
+ waitFuturesComplete();
if (close) {
dataStreamCloseReply = out.closeAsync();
}
@@ -485,8 +485,16 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
@Override
public void flush() throws IOException {
+ if (xceiverClientFactory != null && xceiverClient != null
+ && !config.isStreamBufferFlushDelay()) {
+ waitFuturesComplete();
+ }
+ }
+
+ public void waitFuturesComplete() throws IOException {
try {
CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
+ futures.clear();
} catch (Exception e) {
LOG.warn("Failed to write all chunks through stream: " + e);
throw new IOException(e);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 696ab92..2100337 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -89,7 +89,6 @@ public class TestBlockDataStreamOutput {
blockSize = 2 * maxFlushSize;
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
- clientConfig.setStreamBufferFlushDelay(false);
conf.setFromObject(clientConfig);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org