You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/09 02:17:31 UTC

[ozone] branch master updated: HDDS-6844. Reduce the number of watch requests in XceiverClientRatis. (#3493)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d7ad4ccd7 HDDS-6844. Reduce the number of watch requests in XceiverClientRatis. (#3493)
1d7ad4ccd7 is described below

commit 1d7ad4ccd7e9651b08487a801b15ecc7878f0f1c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Jun 8 19:17:26 2022 -0700

    HDDS-6844. Reduce the number of watch requests in XceiverClientRatis. (#3493)
---
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java | 111 +++++++++++++--------
 1 file changed, 67 insertions(+), 44 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 6982d41fbc..d0fd0db129 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.OptionalLong;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -55,6 +55,7 @@ import com.google.common.base.Preconditions;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -103,7 +104,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   // Map to track commit index at every server
   private final ConcurrentHashMap<UUID, Long> commitInfoMap;
 
-  private XceiverClientMetrics metrics;
+  private final XceiverClientMetrics metrics
+      = XceiverClientManager.getXceiverClientMetrics();
 
   /**
    * Constructs a client.
@@ -117,31 +119,46 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     this.retryPolicy = retryPolicy;
     commitInfoMap = new ConcurrentHashMap<>();
     this.tlsConfig = tlsConfig;
-    metrics = XceiverClientManager.getXceiverClientMetrics();
     this.ozoneConfiguration = configuration;
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
+          new Throwable("TRACE"));
+    }
+  }
+
+  private long updateCommitInfosMap(RaftClientReply reply) {
+    return Optional.ofNullable(reply)
+        .filter(RaftClientReply::isSuccess)
+        .map(RaftClientReply::getCommitInfos)
+        .map(this::updateCommitInfosMap)
+        .orElse(0L);
   }
 
-  private void updateCommitInfosMap(
+  private long updateCommitInfosMap(
       Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
     // if the commitInfo map is empty, just update the commit indexes for each
     // of the servers
+    final Stream<Long> stream;
     if (commitInfoMap.isEmpty()) {
-      commitInfoProtos.forEach(proto -> commitInfoMap
-          .put(RatisHelper.toDatanodeId(proto.getServer()),
-              proto.getCommitIndex()));
+      stream = commitInfoProtos.stream().map(this::putCommitInfo);
       // In case the commit is happening 2 way, just update the commitIndex
       // for the servers which have been successfully updating the commit
       // indexes. This is important because getReplicatedMinCommitIndex()
       // should always return the min commit index out of the nodes which have
       // been replicating data successfully.
     } else {
-      commitInfoProtos.forEach(proto -> commitInfoMap
+      stream = commitInfoProtos.stream().map(proto -> commitInfoMap
           .computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
-              (address, index) -> {
-                index = proto.getCommitIndex();
-                return index;
-              }));
+              (address, index) -> proto.getCommitIndex()));
     }
+    return stream.mapToLong(Long::longValue).min().orElse(0);
+  }
+
+  private long putCommitInfo(RaftProtos.CommitInfoProto proto) {
+    final long index = proto.getCommitIndex();
+    commitInfoMap.put(RatisHelper.toDatanodeId(proto.getServer()), index);
+    return index;
   }
 
   /**
@@ -233,9 +250,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   // gets the minimum log index replicated to all servers
   @Override
   public long getReplicatedMinCommitIndex() {
-    OptionalLong minIndex =
-        commitInfoMap.values().parallelStream().mapToLong(v -> v).min();
-    return minIndex.isPresent() ? minIndex.getAsLong() : 0;
+    return commitInfoMap.values().parallelStream()
+        .mapToLong(Long::longValue).min().orElse(0);
   }
 
   private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
@@ -244,51 +260,58 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     reply.addDatanode(builder.build());
   }
 
+  private XceiverClientReply newWatchReply(
+      long watchIndex, Object reason, long replyIndex) {
+    LOG.debug("watchForCommit({}) returns {} {}",
+        watchIndex, reason, replyIndex);
+    final XceiverClientReply reply = new XceiverClientReply(null);
+    reply.setLogIndex(replyIndex);
+    return reply;
+  }
+
   @Override
   public XceiverClientReply watchForCommit(long index)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
-    long commitIndex = getReplicatedMinCommitIndex();
-    XceiverClientReply clientReply = new XceiverClientReply(null);
-    if (commitIndex >= index) {
-      // return the min commit index till which the log has been replicated to
-      // all servers
-      clientReply.setLogIndex(commitIndex);
-      return clientReply;
+    final long replicatedMin = getReplicatedMinCommitIndex();
+    if (replicatedMin >= index) {
+      return newWatchReply(index, "replicatedMin", replicatedMin);
     }
-    RaftClientReply reply;
+
     try {
       CompletableFuture<RaftClientReply> replyFuture = getClient().async()
           .watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
-      replyFuture.get();
+      final RaftClientReply reply = replyFuture.get();
+      final long updated = updateCommitInfosMap(reply);
+      Preconditions.checkState(updated >= index);
+      return newWatchReply(index, ReplicationLevel.ALL_COMMITTED, updated);
     } catch (Exception e) {
       Throwable t = HddsClientUtils.checkForException(e);
       LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
       if (t instanceof GroupMismatchException) {
         throw e;
       }
-      reply = getClient().async()
+      final RaftClientReply reply = getClient().async()
           .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .get();
-      List<RaftProtos.CommitInfoProto> commitInfoProtoList =
-          reply.getCommitInfos().stream()
-              .filter(i -> i.getCommitIndex() < index)
-              .collect(Collectors.toList());
-      commitInfoProtoList.parallelStream().forEach(proto -> {
-        UUID address = RatisHelper.toDatanodeId(proto.getServer());
-        addDatanodetoReply(address, clientReply);
-        // since 3 way commit has failed, the updated map from now on  will
-        // only store entries for those datanodes which have had successful
-        // replication.
-        commitInfoMap.remove(address);
-        LOG.info(
-            "Could not commit index {} on pipeline {} to all the nodes. " +
-            "Server {} has failed. Committed by majority.",
-            index, pipeline, address);
-      });
+      final XceiverClientReply clientReply = newWatchReply(
+          index, ReplicationLevel.MAJORITY_COMMITTED, index);
+      reply.getCommitInfos().stream()
+          .filter(i -> i.getCommitIndex() < index)
+          .forEach(proto -> {
+            UUID address = RatisHelper.toDatanodeId(proto.getServer());
+            addDatanodetoReply(address, clientReply);
+            // since 3 way commit has failed, the updated map from now on  will
+            // only store entries for those datanodes which have had successful
+            // replication.
+            commitInfoMap.remove(address);
+            LOG.info(
+                "Could not commit index {} on pipeline {} to all the nodes. " +
+                "Server {} has failed. Committed by majority.",
+                index, pipeline, address);
+          });
+      return clientReply;
     }
-    clientReply.setLogIndex(index);
-    return clientReply;
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org