You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/10 06:40:08 UTC
[ozone] branch HDDS-4454 updated: HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in StreamCommitWatcher. (#3492)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-4454 by this push:
new cabc517bd5 HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in StreamCommitWatcher. (#3492)
cabc517bd5 is described below
commit cabc517bd5308dbe900187a50def225040efa3a1
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jun 9 23:40:02 2022 -0700
HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in StreamCommitWatcher. (#3492)
---
.../hdds/scm/storage/AbstractDataStreamOutput.java | 3 +-
.../hdds/scm/storage/StreamCommitWatcher.java | 39 +++++++++++++---------
.../ozone/freon/OzoneClientKeyGenerator.java | 12 +++----
3 files changed, 32 insertions(+), 22 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
index e29670d781..cad1d04792 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
+import java.util.Objects;
/**
* This class is used for error handling methods.
@@ -111,7 +112,7 @@ public abstract class AbstractDataStreamOutput
if (Thread.currentThread().isInterrupted()) {
setExceptionAndThrow(exception);
}
- Preconditions.checkNotNull(action);
+ Objects.requireNonNull(action);
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
if (action.delayMillis > 0) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index 1820416d32..8ca70de816 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -16,17 +16,13 @@
* limitations under the License.
*/
-/**
- * This class maintains the map of the commitIndexes to be watched for
- * successful replication in the datanodes in a given pipeline. It also releases
- * the buffers associated with the user data back to {@Link BufferPool} once
- * minimum replication criteria is achieved during an ozone key write.
- */
package org.apache.hadoop.hdds.scm.storage;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +30,9 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -49,13 +48,15 @@ public class StreamCommitWatcher {
LoggerFactory.getLogger(StreamCommitWatcher.class);
private Map<Long, List<StreamBuffer>> commitIndexMap;
- private List<StreamBuffer> bufferList;
+ private final List<StreamBuffer> bufferList;
// total data which has been successfully flushed and acknowledged
// by all servers
private long totalAckDataLength;
+ private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>>
+ replies = new ConcurrentHashMap<>();
- private XceiverClientSpi xceiverClient;
+ private final XceiverClientSpi xceiverClient;
public StreamCommitWatcher(XceiverClientSpi xceiverClient,
List<StreamBuffer> bufferList) {
@@ -130,16 +131,24 @@ public class StreamCommitWatcher {
*/
public XceiverClientReply streamWatchForCommit(long commitIndex)
throws IOException {
- final long index;
+ final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
+ = JavaUtils.memoize(CompletableFuture::new);
+ final CompletableFuture<XceiverClientReply> f = replies.compute(commitIndex,
+ (key, value) -> value != null ? value : supplier.get());
+ if (!supplier.isInitialized()) {
+ // future already exists
+ return f.join();
+ }
+
try {
XceiverClientReply reply =
xceiverClient.watchForCommit(commitIndex);
- if (reply == null) {
- index = 0;
- } else {
- index = reply.getLogIndex();
- }
- adjustBuffers(index);
+ f.complete(reply);
+ final CompletableFuture<XceiverClientReply> removed
+ = replies.remove(commitIndex);
+ Preconditions.checkState(removed == f);
+
+ adjustBuffers(reply.getLogIndex());
return reply;
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java
index 6ab5c03009..43cdfcfc50 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java
@@ -24,7 +24,8 @@ import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
@@ -132,14 +133,13 @@ public class OzoneClientKeyGenerator extends BaseFreonGenerator
}
private void createStreamKey(long counter) throws Exception {
- final ReplicationConfig replicationConfig = ReplicationConfig
- .fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
- HddsProtos.ReplicationFactor.THREE);
+ final ReplicationConfig conf = ReplicationConfig.fromProtoTypeAndFactor(
+ ReplicationType.RATIS, ReplicationFactor.THREE);
final String key = generateObjectName(counter);
timer.time(() -> {
- try (OzoneDataStreamOutput stream = bucket
- .createStreamKey(key, keySize, replicationConfig, metadata)) {
+ try (OzoneDataStreamOutput stream = bucket.createStreamKey(
+ key, keySize, conf, metadata)) {
contentGenerator.write(stream);
}
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org