You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/08/10 08:43:12 UTC

[ratis] branch master updated: RATIS-1663. Record call id for board casting a heartbeat. (#706)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dd3c1db0 RATIS-1663. Record call id for board casting a heartbeat. (#706)
5dd3c1db0 is described below

commit 5dd3c1db093bb06e462afbd0df4b8b215bbd8bf3
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Aug 10 01:43:08 2022 -0700

    RATIS-1663. Record call id for board casting a heartbeat. (#706)
---
 .../src/main/java/org/apache/ratis/rpc/CallId.java | 12 ++++
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 28 +++++++--
 .../apache/ratis/server/leader/LogAppender.java    |  7 +++
 .../org/apache/ratis/server/impl/ReadRequests.java | 66 ++++++++++++++++++++++
 .../ratis/server/leader/LogAppenderDefault.java    | 16 +++++-
 5 files changed, 120 insertions(+), 9 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
index a6914e27b..85e6ef06b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.rpc;
 
+import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -27,6 +28,17 @@ import java.util.concurrent.atomic.AtomicLong;
 public final class CallId {
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 
+  private static final Comparator<Long> COMPARATOR = (left, right) -> {
+    final long diff = left - right;
+    // check diff < Long.MAX_VALUE/2 for the possibility of numerical overflow
+    return diff == 0? 0: diff > 0 && diff < Long.MAX_VALUE/2? 1: -1;
+  };
+
+  /** @return a long comparator, which takes care the possibility of numerical overflow, for comparing call ids. */
+  public static Comparator<Long> getComparator() {
+    return COMPARATOR;
+  }
+
   /** @return the default value. */
   public static long getDefault() {
     return 0;
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 3f01ea299..3e33a1787 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -47,6 +47,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
 
@@ -56,9 +57,16 @@ import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
 public class GrpcLogAppender extends LogAppenderBase {
   public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
 
+  private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> {
+    // calculate diff in order to take care the possibility of numerical overflow
+    final long diff = left - right;
+    return diff == 0? 0: diff > 0? 1: -1;
+  };
+
+  private final AtomicLong callId = new AtomicLong();
+
   private final RequestMap pendingRequests = new RequestMap();
   private final int maxPendingRequestsNum;
-  private long callId = 0;
   private volatile boolean firstResponseReceived = false;
   private final boolean installSnapshotEnabled;
 
@@ -235,15 +243,23 @@ public class GrpcLogAppender extends LogAppenderBase {
     }
   }
 
+  @Override
+  public long getCallId() {
+    return callId.get();
+  }
+
+  @Override
+  public Comparator<Long> getCallIdComparator() {
+    return CALL_ID_COMPARATOR;
+  }
+
   private void appendLog(boolean excludeLogEntries) throws IOException {
     final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
-    final StreamObserver<AppendEntriesRequestProto> s;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
-      // prepare and enqueue the append request. note changes on follower's
-      // nextIndex and ops on pendingRequests should always be associated
-      // together and protected by the lock
-      pending = newAppendEntriesRequest(callId++, excludeLogEntries);
+      // Prepare and send the append request.
+      // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
+      pending = newAppendEntriesRequest(callId.getAndIncrement(), excludeLogEntries);
       if (pending == null) {
         return;
       }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 135b4318d..ef5e1a7ed 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Comparator;
 
 /**
  * A {@link LogAppender} is for the leader to send appendEntries to a particular follower.
@@ -81,6 +82,12 @@ public interface LogAppender {
     return getFollower().getPeer().getId();
   }
 
+  /** @return the call id for the next {@link AppendEntriesRequestProto}. */
+  long getCallId();
+
+  /** @return the a {@link Comparator} for comparing call ids. */
+  Comparator<Long> getCallIdComparator();
+
   /**
    * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
    * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
new file mode 100644
index 000000000..b8d8998c7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.server.leader.LogAppender;
+
+/** For supporting linearizable read. */
+class ReadRequests {
+  /** The acknowledgement from a {@link LogAppender} of a heartbeat for a particular call id. */
+  static class HeartbeatAck {
+    private final LogAppender appender;
+    private final long minCallId;
+    private volatile boolean acknowledged = false;
+
+    HeartbeatAck(LogAppender appender) {
+      this.appender = appender;
+      this.minCallId = appender.getCallId();
+    }
+
+    /** Is the heartbeat (for a particular call id) acknowledged? */
+    boolean isAcknowledged() {
+      return acknowledged;
+    }
+
+    /**
+     * @return true if the acknowledged state is changed from false to true;
+     *         otherwise, the acknowledged state remains unchanged, return false.
+     */
+    boolean receive(AppendEntriesReplyProto reply) {
+      if (acknowledged) {
+        return false;
+      }
+      synchronized (this) {
+        if (!acknowledged && isValid(reply)) {
+          acknowledged = true;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    private boolean isValid(AppendEntriesReplyProto reply) {
+      if (reply == null || !reply.getServerReply().getSuccess()) {
+        return false;
+      }
+      // valid only if the reply has a later call id than the min.
+      return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
+    }
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index c9d341409..0c91427e5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.server.leader;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
@@ -30,6 +29,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.Comparator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -42,12 +42,22 @@ class LogAppenderDefault extends LogAppenderBase {
     super(server, leaderState, f);
   }
 
+  @Override
+  public long getCallId() {
+    return CallId.get();
+  }
+
+  @Override
+  public Comparator<Long> getCallIdComparator() {
+    return CallId.getComparator();
+  }
+
   /** Send an appendEntries RPC; retry indefinitely. */
-  @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
   private AppendEntriesReplyProto sendAppendEntriesWithRetries()
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
-    AppendEntriesRequestProto request = null;
+
+    AppendEntriesRequestProto request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
     while (isRunning()) { // keep retrying for IOException
       try {
         if (request == null || request.getEntriesCount() == 0) {