You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/03/24 12:41:45 UTC
[ozone] 24/31: HDDS-6138.[Ozone-Streaming] Define a limit on the size of the retry bufferList. (#2946)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit d290fba027a7531ba627f0ebf01e09bd48575522
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Thu Feb 10 14:36:03 2022 +0530
HDDS-6138.[Ozone-Streaming] Define a limit on the size of the retry bufferList. (#2946)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 33 +++++++++---------
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 2 +-
.../hdds/scm/storage/BlockDataStreamOutput.java | 40 +++++++++++++++++-----
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 12 +++----
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 11 +++---
.../client/rpc/TestBlockDataStreamOutput.java | 14 ++++----
6 files changed, 67 insertions(+), 45 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index ad80b2f..d793d93 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -66,14 +66,6 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int streamBufferSize = 4 * 1024 * 1024;
- @Config(key = "datastream.max.buffer.size",
- defaultValue = "4MB",
- type = ConfigType.SIZE,
- description = "The maximum size of the ByteBuffer "
- + "(used via ratis streaming)",
- tags = ConfigTag.CLIENT)
- private int dataStreamMaxBufferSize = 4 * 1024 * 1024;
-
@Config(key = "datastream.buffer.flush.size",
defaultValue = "16MB",
type = ConfigType.SIZE,
@@ -89,6 +81,15 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int dataStreamMinPacketSize = 1024 * 1024;
+ @Config(key = "datastream.window.size",
+ defaultValue = "64MB",
+ type = ConfigType.SIZE,
+ description = "Maximum size of BufferList(used for retry) size per " +
+ "BlockDataStreamOutput instance",
+ tags = ConfigTag.CLIENT)
+ private long streamWindowSize = 64 * 1024 * 1024;
+
+
@Config(key = "stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
@@ -217,14 +218,6 @@ public class OzoneClientConfig {
this.streamBufferSize = streamBufferSize;
}
- public int getDataStreamMaxBufferSize() {
- return dataStreamMaxBufferSize;
- }
-
- public void setDataStreamMaxBufferSize(int dataStreamMaxBufferSize) {
- this.dataStreamMaxBufferSize = dataStreamMaxBufferSize;
- }
-
public boolean isStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}
@@ -249,6 +242,14 @@ public class OzoneClientConfig {
this.dataStreamMinPacketSize = dataStreamMinPacketSize;
}
+ public long getStreamWindowSize() {
+ return streamWindowSize;
+ }
+
+ public void setStreamWindowSize(long streamWindowSize) {
+ this.streamWindowSize = streamWindowSize;
+ }
+
public int getMaxRetryCount() {
return maxRetryCount;
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index f37cd1c..2ad6b69 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -122,7 +122,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
this.ozoneConfiguration = configuration;
}
- private void updateCommitInfosMap(
+ public void updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index ec925d1..a3fe1c2 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -52,7 +52,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -116,9 +118,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final StreamCommitWatcher commitWatcher;
- private final AtomicReference<CompletableFuture<
- ContainerCommandResponseProto>> putBlockFuture
- = new AtomicReference<>(CompletableFuture.completedFuture(null));
+
+ private Queue<CompletableFuture<ContainerCommandResponseProto>>
+ putBlockFutures = new LinkedList<>();
private final List<DatanodeDetails> failedServers;
private final Checksum checksum;
@@ -307,14 +309,33 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
}
private void doFlushIfNeeded() throws IOException {
- Preconditions.checkArgument(config.getDataStreamBufferFlushSize() > config
- .getDataStreamMaxBufferSize());
long boundary = config.getDataStreamBufferFlushSize() / config
- .getDataStreamMaxBufferSize();
+ .getDataStreamMinPacketSize();
+ // streamWindow is the maximum number of buffers that
+ // are allowed to exist in the bufferList. If buffers in
+ // the list exceed this limit , client will till it gets
+ // one putBlockResponse (first index) . This is similar to
+ // the bufferFull condition in async write path.
+ long streamWindow = config.getStreamWindowSize() / config
+ .getDataStreamMinPacketSize();
if (!bufferList.isEmpty() && bufferList.size() % boundary == 0) {
updateFlushLength();
executePutBlock(false, false);
}
+ if (bufferList.size()==streamWindow){
+ try {
+ checkOpen();
+ if (!putBlockFutures.isEmpty()) {
+ putBlockFutures.remove().get();
+ }
+ } catch (ExecutionException e) {
+ handleExecutionException(e);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ handleInterruptedException(ex, true);
+ }
+ watchForCommit(true);
+ }
}
private void updateFlushLength() {
@@ -453,8 +474,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
setIoException(ce);
throw ce;
});
- putBlockFuture.updateAndGet(f -> f.thenCombine(flushFuture,
- (previous, current) -> current));
+ putBlockFutures.add(flushFuture);
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
@@ -496,7 +516,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// data since latest flush - we need to send the "EOF" flag
executePutBlock(true, true);
}
- putBlockFuture.get().get();
+ CompletableFuture.allOf(putBlockFutures.toArray(EMPTY_FUTURE_ARRAY)).get();
watchForCommit(false);
// just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded
@@ -638,6 +658,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
CompletionException ce = new CompletionException(msg, e);
setIoException(ce);
throw ce;
+ } else if (r.isSuccess()) {
+ xceiverClient.updateCommitInfosMap(r.getCommitInfos());
}
}, responseExecutor);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 5c0e74f..dc7a59b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -324,7 +324,7 @@ public interface MiniOzoneCluster {
protected OptionalInt streamBufferSize = OptionalInt.empty();
protected Optional<Long> streamBufferFlushSize = Optional.empty();
protected Optional<Long> dataStreamBufferFlushSize= Optional.empty();
- protected OptionalInt dataStreamMaxBufferSize = OptionalInt.empty();
+ protected Optional<Long> datastreamWindowSize= Optional.empty();
protected Optional<Long> streamBufferMaxSize = Optional.empty();
protected OptionalInt dataStreamMinPacketSize = OptionalInt.empty();
protected Optional<Long> blockSize = Optional.empty();
@@ -560,11 +560,6 @@ public interface MiniOzoneCluster {
return this;
}
- public Builder setDataStreamBufferMaxSize(int size) {
- dataStreamMaxBufferSize = OptionalInt.of(size);
- return this;
- }
-
public Builder setDataStreamBufferFlushize(long size) {
dataStreamBufferFlushSize = Optional.of(size);
return this;
@@ -575,6 +570,11 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setDataStreamStreamWindowSize(long size) {
+ datastreamWindowSize = Optional.of(size);
+ return this;
+ }
+
/**
* Sets the block size for stream buffer.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 7f75bc6..32ee3bb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -657,12 +657,12 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
if (!dataStreamBufferFlushSize.isPresent()) {
dataStreamBufferFlushSize = Optional.of((long) 4 * chunkSize.get());
}
- if (!dataStreamMaxBufferSize.isPresent()) {
- dataStreamMaxBufferSize = OptionalInt.of(chunkSize.get());
- }
if (!dataStreamMinPacketSize.isPresent()) {
dataStreamMinPacketSize = OptionalInt.of(chunkSize.get()/4);
}
+ if (!datastreamWindowSize.isPresent()) {
+ datastreamWindowSize = Optional.of((long) 8 * chunkSize.get());
+ }
if (!blockSize.isPresent()) {
blockSize = Optional.of(2 * streamBufferMaxSize.get());
}
@@ -681,12 +681,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get())));
clientConfig.setDataStreamBufferFlushSize(Math.round(
streamBufferSizeUnit.get().toBytes(dataStreamBufferFlushSize.get())));
- clientConfig.setDataStreamMaxBufferSize((int) Math.round(
- streamBufferSizeUnit.get()
- .toBytes(dataStreamMaxBufferSize.getAsInt())));
clientConfig.setDataStreamMinPacketSize((int) Math.round(
streamBufferSizeUnit.get()
.toBytes(dataStreamMinPacketSize.getAsInt())));
+ clientConfig.setStreamWindowSize(Math.round(
+ streamBufferSizeUnit.get().toBytes(datastreamWindowSize.get())));
conf.setFromObject(clientConfig);
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index c6a3c32..696ab92 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -106,9 +106,9 @@ public class TestBlockDataStreamOutput {
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setDataStreamBufferFlushize(maxFlushSize)
- .setDataStreamBufferMaxSize(chunkSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES)
- .setDataStreamMinPacketSize(2*chunkSize/5)
+ .setDataStreamMinPacketSize(chunkSize)
+ .setDataStreamStreamWindowSize(5*chunkSize)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
@@ -195,7 +195,7 @@ public class TestBlockDataStreamOutput {
@Test
public void testPutBlockAtBoundary() throws Exception {
- int dataLength = 200;
+ int dataLength = 500;
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
long putBlockCount = metrics.getContainerOpCountMetrics(
@@ -213,8 +213,8 @@ public class TestBlockDataStreamOutput {
metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
<= pendingPutBlockCount + 1);
key.close();
- // Since data length is 200 , first putBlock will be at 160(flush boundary)
- // and the other at 200
+ // Since data length is 500 , first putBlock will be at 400(flush boundary)
+ // and the other at 500
Assert.assertTrue(
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)
== putBlockCount + 2);
@@ -242,10 +242,10 @@ public class TestBlockDataStreamOutput {
long writeChunkCount =
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
byte[] data =
- ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 5)
+ ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2)
.getBytes(UTF_8);
key.write(ByteBuffer.wrap(data));
- // minPacketSize= 40, so first write of 20 wont trigger a writeChunk
+ // minPacketSize= 100, so first write of 50 wont trigger a writeChunk
Assert.assertEquals(writeChunkCount,
metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
key.write(ByteBuffer.wrap(data));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org