You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/05/11 12:24:33 UTC

[iotdb] branch jira3167 created (now 6a4febe4c5)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira3167
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 6a4febe4c5 finish

This branch includes the following new commits:

     new 6a4febe4c5 finish

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: finish

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira3167
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6a4febe4c58d06a789ddf2a37fcf870fd69c8cec
Author: LebronAl <TX...@gmail.com>
AuthorDate: Wed May 11 20:23:59 2022 +0800

    finish
---
 confignode/src/assembly/resources/conf/logback.xml |  2 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      | 92 ++++++++++++++++------
 .../commons/client/ClientFactoryProperty.java      |  2 +-
 3 files changed, 70 insertions(+), 26 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/logback.xml b/confignode/src/assembly/resources/conf/logback.xml
index e5181c56dd..9c538adb4d 100644
--- a/confignode/src/assembly/resources/conf/logback.xml
+++ b/confignode/src/assembly/resources/conf/logback.xml
@@ -136,5 +136,5 @@
         <appender-ref ref="stdout"/>
     </root>
     <logger level="info" name="org.apache.iotdb.confignode"/>
-    <logger level="warn" name="org.apache.ratis"/>
+    <logger level="info" name="org.apache.ratis"/>
 </configuration>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 8d644bbc81..f714b2bf65 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -58,6 +58,7 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.NetUtils;
@@ -72,6 +73,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -103,6 +105,9 @@ class RatisConsensus implements IConsensus {
   private static final int DEFAULT_PRIORITY = 0;
   private static final int LEADER_PRIORITY = 1;
 
+  // TODO make it configurable
+  private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
+
   /**
    * @param ratisStorageDir different groups of RatisConsensus Peer all share ratisStorageDir as
    *     root dir
@@ -149,13 +154,13 @@ class RatisConsensus implements IConsensus {
    */
   @Override
   public ConsensusWriteResponse write(
-      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
 
     // pre-condition: group exists and myself server serves this group
-    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
-      return failedWrite(new ConsensusGroupNotExistException(groupId));
+      return failedWrite(new ConsensusGroupNotExistException(consensusGroupId));
     }
 
     // serialize request into Message
@@ -163,22 +168,24 @@ class RatisConsensus implements IConsensus {
 
     // 1. first try the local server
     RaftClientRequest clientRequest =
-        buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
+        buildRawRequest(raftGroupId, message, RaftClientRequest.writeRequestType());
     RaftClientReply localServerReply;
     RaftPeer suggestedLeader = null;
-    try {
-      localServerReply = server.submitClientRequest(clientRequest);
-      if (localServerReply.isSuccess()) {
-        ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
-        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
-        return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+    if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
+      try {
+        localServerReply = server.submitClientRequest(clientRequest);
+        if (localServerReply.isSuccess()) {
+          ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
+          TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+          return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+        }
+        NotLeaderException ex = localServerReply.getNotLeaderException();
+        if (ex != null) { // local server is not leader
+          suggestedLeader = ex.getSuggestedLeader();
+        }
+      } catch (IOException e) {
+        return failedWrite(new RatisRequestFailedException(e));
       }
-      NotLeaderException ex = localServerReply.getNotLeaderException();
-      if (ex != null) { // local server is not leader
-        suggestedLeader = ex.getSuggestedLeader();
-      }
-    } catch (IOException e) {
-      return failedWrite(new RatisRequestFailedException(e));
     }
 
     // 2. try raft client
@@ -209,18 +216,20 @@ class RatisConsensus implements IConsensus {
 
   /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-
-    RaftGroup group = getGroupInfo(Utils.fromConsensusGroupIdToRaftGroupId(groupId));
+  public ConsensusReadResponse read(
+      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
+    RaftGroupId groupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
+    RaftGroup group = getGroupInfo(groupId);
     if (group == null || !group.getPeers().contains(myself)) {
-      return failedRead(new ConsensusGroupNotExistException(groupId));
+      return failedRead(new ConsensusGroupNotExistException(consensusGroupId));
     }
 
     RaftClientReply reply;
     try {
       RequestMessage message = new RequestMessage(IConsensusRequest);
       RaftClientRequest clientRequest =
-          buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(-1));
+          buildRawRequest(
+              groupId, message, RaftClientRequest.staleReadRequestType(getCommitIndex(groupId)));
       reply = server.submitClientRequest(clientRequest);
       if (!reply.isSuccess()) {
         return failedRead(new RatisRequestFailedException(reply.getException()));
@@ -476,6 +485,32 @@ class RatisConsensus implements IConsensus {
     return isLeader;
   }
 
+  private boolean waitUntilLeaderReady(RaftGroupId groupId) {
+    DivisionInfo divisionInfo;
+    try {
+      divisionInfo = server.getDivision(groupId).getInfo();
+    } catch (IOException e) {
+      // if the query fails, simply return not leader
+      logger.info("isLeaderReady checking failed with exception: ", e);
+      return false;
+    }
+    long startTime = System.currentTimeMillis();
+    try {
+      while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
+        Thread.sleep(100);
+        long consumedTime = System.currentTimeMillis() - startTime;
+        if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) {
+          logger.warn("{}: leader is still not ready after {}ms", groupId, consumedTime);
+          return false;
+        }
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Unexpected interruption", e);
+      return false;
+    }
+    return divisionInfo.isLeader();
+  }
+
   @Override
   public Peer getLeader(ConsensusGroupId groupId) {
     if (isLeader(groupId)) {
@@ -512,12 +547,12 @@ class RatisConsensus implements IConsensus {
   }
 
   private RaftClientRequest buildRawRequest(
-      ConsensusGroupId groupId, Message message, RaftClientRequest.Type type) {
+      RaftGroupId groupId, Message message, RaftClientRequest.Type type) {
     return RaftClientRequest.newBuilder()
         .setServerId(server.getId())
         .setClientId(localFakeId)
         .setCallId(localFakeCallId.incrementAndGet())
-        .setGroupId(Utils.fromConsensusGroupIdToRaftGroupId(groupId))
+        .setGroupId(groupId)
         .setType(type)
         .setMessage(message)
         .build();
@@ -534,11 +569,20 @@ class RatisConsensus implements IConsensus {
         lastSeen.put(raftGroupId, raftGroup);
       }
     } catch (IOException e) {
-      logger.debug("get group failed ", e);
+      logger.debug("get group {} failed ", raftGroupId, e);
     }
     return raftGroup;
   }
 
+  private long getCommitIndex(RaftGroupId raftGroupId) {
+    try {
+      return server.getDivision(raftGroupId).getRaftLog().getLastCommittedIndex();
+    } catch (IOException e) {
+      logger.debug("get group {} failed ", raftGroupId, e);
+    }
+    return -1;
+  }
+
   private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
     return RaftGroup.valueOf(
         Utils.fromConsensusGroupIdToRaftGroupId(groupId),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
index f1801628e4..2ff20065e8 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
@@ -89,7 +89,7 @@ public class ClientFactoryProperty {
     private DefaultProperty() {}
 
     public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
-    public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20);;
+    public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20);
     public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
   }
 }