You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/03/04 06:45:36 UTC

[GitHub] [iotdb] neuyilan commented on a change in pull request #2766: [IOTDB-1118] [Distributed] Limit query log lag in mid consistency level

neuyilan commented on a change in pull request #2766:
URL: https://github.com/apache/iotdb/pull/2766#discussion_r587176306



##########
File path: cluster/src/assembly/resources/conf/iotdb-cluster.properties
##########
@@ -148,4 +148,8 @@ enable_use_persist_log_on_disk_to_catch_up=true
 
 # The number of logs read on the disk at one time, which is mainly used to control the memory usage.
 # This value multiplied by the log size is about the amount of memory used to read logs from the disk at one time.
-max_number_of_logs_per_fetch_on_disk=1000
\ No newline at end of file
+max_number_of_logs_per_fetch_on_disk=1000
+
+# When consistency level set to mid, query failed if the log lag exceeds max_read_log_lag
+# This default value is 1000
+max_read_log_lag=1000

Review comment:
       Perhaps we need to describe in detail what this parameter represents, such as with the `insertRecord()` interface, where a single log represents a row data, whereas if the `insertTablet()` interface is used, where a single log may represent several thousand rows of data (depending on the size of the tablet set by the user), these things need to be known to users to set this parameter reasonably according to their business logic.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -806,32 +850,42 @@ public void waitLeader() {
    * it.
    *
    * @return true if this node has caught up before timeout, false otherwise
+   * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
+   *     value after timeout
    */
-  private boolean waitUntilCatchUp() {
-    long startTime = System.currentTimeMillis();
-    long waitedTime = 0;
-    long leaderCommitId;
+  protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
+      throws CheckConsistencyException {
+    long leaderCommitId = Long.MIN_VALUE;
     try {
-      leaderCommitId =
-          ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()
-              ? requestCommitIdAsync()
-              : requestCommitIdSync();
-      if (leaderCommitId == Long.MAX_VALUE) {
-        // Long.MAX_VALUE representing there is a network issue
-        return false;
-      }
+      leaderCommitId = config.isUseAsyncServer() ? requestCommitIdAsync() : requestCommitIdSync();

Review comment:
       Maybe we should discuss whether we need to use the leaderCommitId in mid consistency, because, the applied index is updated in one separate thread, which means that in a very short time of one node, the applied index must be smaller than the commit index. 
   
   In the process of writing, the applied index of the local node is always smaller than the commit index, not to mention the comparison between the local applied index and the commit index of the leader. This leads to the need to wait for a heartbeat time before judging whether the local applied index is catching up with the leader commit index.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
##########
@@ -109,6 +111,9 @@ public void setUp() throws Exception {
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
     RaftMember.setWaitLeaderTimeMs(10);
 
+    RaftServer.setSyncLeaderMaxWaitMs(100);
+    RaftServer.setHeartBeatIntervalMs(100);

Review comment:
       Maybe we need to save the previous settings and then reset them after the test is finished.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
##########
@@ -343,121 +356,211 @@ public AsyncClient getAsyncClient(Node node) {
     return newMember;
   }
 
-  @Test
-  public void testsyncLeaderWithConsistencyCheck() {
+  private DataGroupMember newDataGroupMemberWithSyncLeaderTrue(Node node, boolean syncLeader) {
+    DataGroupMember newMember =
+        new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) {
+
+          @Override
+          public boolean syncLeader(RaftMember.CheckConsistency checkConsistency) {
+            return syncLeader;
+          }
+
+          @Override
+          protected long requestCommitIdAsync() {
+            return 1000L;
+          }
+
+          @Override
+          public long appendEntry(AppendEntryRequest request) {
+            return Response.RESPONSE_AGREE;
+          }
+
+          @Override
+          public AsyncClient getAsyncClient(Node node) {
+            try {
+              return new TestAsyncDataClient(node, dataGroupMemberMap);
+            } catch (IOException e) {
+              return null;
+            }
+          }
+        };
+    newMember.setThisNode(node);
+    newMember.setMetaGroupMember(testMetaMember);
+    newMember.setLeader(node);
+    newMember.setCharacter(NodeCharacter.LEADER);
+    newMember.setLogManager(new TestPartitionedLogManager());
+    newMember.setAppendLogThreadPool(testThreadPool);
+    return newMember;
+  }
 
+  @Test
+  public void testsyncLeaderStrongConsistencyCheckFalse() {
     // 1. write request : Strong consistency level with syncLeader false
     DataGroupMember dataGroupMemberWithWriteStrongConsistencyFalse =
-        newDataGroupMemberWithSyncLeader(TestUtils.getNode(0), false);
+        newDataGroupMemberWithSyncLeaderFalse(TestUtils.getNode(0), false);
     ClusterDescriptor.getInstance()
         .getConfig()
-        .setConsistencyLevel(ConsistencyLevel.WEAK_CONSISTENCY);
-    CheckConsistencyException exception = null;
+        .setConsistencyLevel(ConsistencyLevel.STRONG_CONSISTENCY);
     try {
-      dataGroupMemberWithWriteStrongConsistencyFalse.syncLeaderWithConsistencyCheck(true);
+      dataGroupMemberWithWriteStrongConsistencyFalse.waitUntilCatchUp(
+          new RaftMember.CheckConsistency() {
+            @Override
+            public void postCheckConsistency(long leaderCommitId, long localAppliedId)
+                throws CheckConsistencyException {
+              if (leaderCommitId == Long.MAX_VALUE
+                  || leaderCommitId == Long.MIN_VALUE
+                  || leaderCommitId > localAppliedId) {
+                throw CheckConsistencyException.CHECK_STRONG_CONSISTENCY_EXCEPTION;
+              }
+            }
+          });

Review comment:
       Why not just use `StrongCheckConsistency`?
   The same as others.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
##########
@@ -190,6 +195,9 @@ public void tearDown() throws Exception {
     ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(preLogBufferSize);
     ClusterDescriptor.getInstance().getConfig().setUseAsyncApplier(prevUseAsyncApplier);
     IoTDBDescriptor.getInstance().getConfig().setEnableWal(prevEnableWAL);
+
+    RaftServer.setSyncLeaderMaxWaitMs(20 * 1000);
+    RaftServer.setHeartBeatIntervalMs(1000L);

Review comment:
       It's better to use the saved configuration. We can't guarantee that the default configuration will not be modified.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org