You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/10/25 01:15:45 UTC

[ozone] 36/38: HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in StreamCommitWatcher. (#3492)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit a5980ce1a4afe19b680454c445423a9200d1f39f
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jun 9 23:40:02 2022 -0700

    HDDS-6842. [Ozone-Streaming] Reduce the number of watch requests in StreamCommitWatcher. (#3492)
    
    (cherry picked from commit 8315f08791463eb6a2dc3d5fcd409156ff3e9a13)
    (cherry picked from commit 4c064f42aaa027baa0ca0c7ed373a143a26a6cec)
---
 .../hdds/scm/storage/AbstractDataStreamOutput.java |  3 +-
 .../hdds/scm/storage/StreamCommitWatcher.java      | 39 +++++++++++++---------
 .../ozone/freon/OzoneClientKeyGenerator.java       | 12 +++----
 3 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
index e29670d781..cad1d04792 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * This class is used for error handling methods.
@@ -111,7 +112,7 @@ public abstract class AbstractDataStreamOutput
     if (Thread.currentThread().isInterrupted()) {
       setExceptionAndThrow(exception);
     }
-    Preconditions.checkNotNull(action);
+    Objects.requireNonNull(action);
     Preconditions.checkArgument(
         action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
     if (action.delayMillis > 0) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index 1820416d32..8ca70de816 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -16,17 +16,13 @@
  *  limitations under the License.
  */
 
-/**
- * This class maintains the map of the commitIndexes to be watched for
- * successful replication in the datanodes in a given pipeline. It also releases
- * the buffers associated with the user data back to {@Link BufferPool} once
- * minimum replication criteria is achieved during an ozone key write.
- */
 package org.apache.hadoop.hdds.scm.storage;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +30,9 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -49,13 +48,15 @@ public class StreamCommitWatcher {
       LoggerFactory.getLogger(StreamCommitWatcher.class);
 
   private Map<Long, List<StreamBuffer>> commitIndexMap;
-  private List<StreamBuffer> bufferList;
+  private final List<StreamBuffer> bufferList;
 
   // total data which has been successfully flushed and acknowledged
   // by all servers
   private long totalAckDataLength;
+  private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>>
+      replies = new ConcurrentHashMap<>();
 
-  private XceiverClientSpi xceiverClient;
+  private final XceiverClientSpi xceiverClient;
 
   public StreamCommitWatcher(XceiverClientSpi xceiverClient,
       List<StreamBuffer> bufferList) {
@@ -130,16 +131,24 @@ public class StreamCommitWatcher {
    */
   public XceiverClientReply streamWatchForCommit(long commitIndex)
       throws IOException {
-    final long index;
+    final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
+        = JavaUtils.memoize(CompletableFuture::new);
+    final CompletableFuture<XceiverClientReply> f = replies.compute(commitIndex,
+        (key, value) -> value != null ? value : supplier.get());
+    if (!supplier.isInitialized()) {
+      // future already exists
+      return f.join();
+    }
+
     try {
       XceiverClientReply reply =
           xceiverClient.watchForCommit(commitIndex);
-      if (reply == null) {
-        index = 0;
-      } else {
-        index = reply.getLogIndex();
-      }
-      adjustBuffers(index);
+      f.complete(reply);
+      final CompletableFuture<XceiverClientReply> removed
+          = replies.remove(commitIndex);
+      Preconditions.checkState(removed == f);
+
+      adjustBuffers(reply.getLogIndex());
       return reply;
     } catch (InterruptedException e) {
       // Re-interrupt the thread while catching InterruptedException
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java
index b0184bff67..b119e27ea6 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java
@@ -24,7 +24,8 @@ import java.util.concurrent.Callable;
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 
@@ -132,14 +133,13 @@ public class OzoneClientKeyGenerator extends BaseFreonGenerator
   }
 
   private void createStreamKey(long counter) throws Exception {
-    final ReplicationConfig replicationConfig = ReplicationConfig
-        .fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE);
+    final ReplicationConfig conf = ReplicationConfig.fromProtoTypeAndFactor(
+        ReplicationType.RATIS, ReplicationFactor.THREE);
     final String key = generateObjectName(counter);
 
     timer.time(() -> {
-      try (OzoneDataStreamOutput stream = bucket
-          .createStreamKey(key, keySize, replicationConfig, metadata)) {
+      try (OzoneDataStreamOutput stream = bucket.createStreamKey(
+          key, keySize, conf, metadata)) {
         contentGenerator.write(stream);
       }
       return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org