You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/09 02:17:31 UTC
[ozone] branch master updated: HDDS-6844. Reduce the number of watch requests in XceiverClientRatis. (#3493)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1d7ad4ccd7 HDDS-6844. Reduce the number of watch requests in XceiverClientRatis. (#3493)
1d7ad4ccd7 is described below
commit 1d7ad4ccd7e9651b08487a801b15ecc7878f0f1c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Jun 8 19:17:26 2022 -0700
HDDS-6844. Reduce the number of watch requests in XceiverClientRatis. (#3493)
---
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 111 +++++++++++++--------
1 file changed, 67 insertions(+), 44 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 6982d41fbc..d0fd0db129 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.OptionalLong;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -55,6 +55,7 @@ import com.google.common.base.Preconditions;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -103,7 +104,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// Map to track commit index at every server
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
- private XceiverClientMetrics metrics;
+ private final XceiverClientMetrics metrics
+ = XceiverClientManager.getXceiverClientMetrics();
/**
* Constructs a client.
@@ -117,31 +119,46 @@ public final class XceiverClientRatis extends XceiverClientSpi {
this.retryPolicy = retryPolicy;
commitInfoMap = new ConcurrentHashMap<>();
this.tlsConfig = tlsConfig;
- metrics = XceiverClientManager.getXceiverClientMetrics();
this.ozoneConfiguration = configuration;
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
+ new Throwable("TRACE"));
+ }
+ }
+
+ private long updateCommitInfosMap(RaftClientReply reply) {
+ return Optional.ofNullable(reply)
+ .filter(RaftClientReply::isSuccess)
+ .map(RaftClientReply::getCommitInfos)
+ .map(this::updateCommitInfosMap)
+ .orElse(0L);
}
- private void updateCommitInfosMap(
+ private long updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
+ final Stream<Long> stream;
if (commitInfoMap.isEmpty()) {
- commitInfoProtos.forEach(proto -> commitInfoMap
- .put(RatisHelper.toDatanodeId(proto.getServer()),
- proto.getCommitIndex()));
+ stream = commitInfoProtos.stream().map(this::putCommitInfo);
// In case the commit is happening 2 way, just update the commitIndex
// for the servers which have been successfully updating the commit
// indexes. This is important because getReplicatedMinCommitIndex()
// should always return the min commit index out of the nodes which have
// been replicating data successfully.
} else {
- commitInfoProtos.forEach(proto -> commitInfoMap
+ stream = commitInfoProtos.stream().map(proto -> commitInfoMap
.computeIfPresent(RatisHelper.toDatanodeId(proto.getServer()),
- (address, index) -> {
- index = proto.getCommitIndex();
- return index;
- }));
+ (address, index) -> proto.getCommitIndex()));
}
+ return stream.mapToLong(Long::longValue).min().orElse(0);
+ }
+
+ private long putCommitInfo(RaftProtos.CommitInfoProto proto) {
+ final long index = proto.getCommitIndex();
+ commitInfoMap.put(RatisHelper.toDatanodeId(proto.getServer()), index);
+ return index;
}
/**
@@ -233,9 +250,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// gets the minimum log index replicated to all servers
@Override
public long getReplicatedMinCommitIndex() {
- OptionalLong minIndex =
- commitInfoMap.values().parallelStream().mapToLong(v -> v).min();
- return minIndex.isPresent() ? minIndex.getAsLong() : 0;
+ return commitInfoMap.values().parallelStream()
+ .mapToLong(Long::longValue).min().orElse(0);
}
private void addDatanodetoReply(UUID address, XceiverClientReply reply) {
@@ -244,51 +260,58 @@ public final class XceiverClientRatis extends XceiverClientSpi {
reply.addDatanode(builder.build());
}
+ private XceiverClientReply newWatchReply(
+ long watchIndex, Object reason, long replyIndex) {
+ LOG.debug("watchForCommit({}) returns {} {}",
+ watchIndex, reason, replyIndex);
+ final XceiverClientReply reply = new XceiverClientReply(null);
+ reply.setLogIndex(replyIndex);
+ return reply;
+ }
+
@Override
public XceiverClientReply watchForCommit(long index)
throws InterruptedException, ExecutionException, TimeoutException,
IOException {
- long commitIndex = getReplicatedMinCommitIndex();
- XceiverClientReply clientReply = new XceiverClientReply(null);
- if (commitIndex >= index) {
- // return the min commit index till which the log has been replicated to
- // all servers
- clientReply.setLogIndex(commitIndex);
- return clientReply;
+ final long replicatedMin = getReplicatedMinCommitIndex();
+ if (replicatedMin >= index) {
+ return newWatchReply(index, "replicatedMin", replicatedMin);
}
- RaftClientReply reply;
+
try {
CompletableFuture<RaftClientReply> replyFuture = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
- replyFuture.get();
+ final RaftClientReply reply = replyFuture.get();
+ final long updated = updateCommitInfosMap(reply);
+ Preconditions.checkState(updated >= index);
+ return newWatchReply(index, ReplicationLevel.ALL_COMMITTED, updated);
} catch (Exception e) {
Throwable t = HddsClientUtils.checkForException(e);
LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
if (t instanceof GroupMismatchException) {
throw e;
}
- reply = getClient().async()
+ final RaftClientReply reply = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get();
- List<RaftProtos.CommitInfoProto> commitInfoProtoList =
- reply.getCommitInfos().stream()
- .filter(i -> i.getCommitIndex() < index)
- .collect(Collectors.toList());
- commitInfoProtoList.parallelStream().forEach(proto -> {
- UUID address = RatisHelper.toDatanodeId(proto.getServer());
- addDatanodetoReply(address, clientReply);
- // since 3 way commit has failed, the updated map from now on will
- // only store entries for those datanodes which have had successful
- // replication.
- commitInfoMap.remove(address);
- LOG.info(
- "Could not commit index {} on pipeline {} to all the nodes. " +
- "Server {} has failed. Committed by majority.",
- index, pipeline, address);
- });
+ final XceiverClientReply clientReply = newWatchReply(
+ index, ReplicationLevel.MAJORITY_COMMITTED, index);
+ reply.getCommitInfos().stream()
+ .filter(i -> i.getCommitIndex() < index)
+ .forEach(proto -> {
+ UUID address = RatisHelper.toDatanodeId(proto.getServer());
+ addDatanodetoReply(address, clientReply);
+ // since 3 way commit has failed, the updated map from now on will
+ // only store entries for those datanodes which have had successful
+ // replication.
+ commitInfoMap.remove(address);
+ LOG.info(
+ "Could not commit index {} on pipeline {} to all the nodes. " +
+ "Server {} has failed. Committed by majority.",
+ index, pipeline, address);
+ });
+ return clientReply;
}
- clientReply.setLogIndex(index);
- return clientReply;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org