You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/29 06:28:21 UTC

[iotdb] branch ml_0729_test updated: add cache logic in data region statemachine for multi leader write

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test by this push:
     new bbf2bf1dff add cache logic in data region statemachine for multi leader write
bbf2bf1dff is described below

commit bbf2bf1dff1fbe128d6422fb0fcc48e4699f3ecf
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 14:27:18 2022 +0800

    add cache logic in data region statemachine for multi leader write
---
 .../common/request/IndexedConsensusRequest.java    | 13 ++++++
 .../multileader/MultiLeaderServerImpl.java         |  5 ++-
 .../service/MultiLeaderRPCServiceProcessor.java    |  9 ++++-
 .../statemachine/DataRegionStateMachine.java       | 46 +++++++++++++++++++++-
 4 files changed, 68 insertions(+), 5 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..9c5112fa2b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -29,11 +29,20 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   /** we do not need to serialize these two fields as they are useless in other nodes. */
   private final long searchIndex;
 
+  private final long syncIndex;
   private final List<IConsensusRequest> requests;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
     this.requests = requests;
+    this.syncIndex = -1L;
+  }
+
+  public IndexedConsensusRequest(
+      long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
+    this.searchIndex = searchIndex;
+    this.requests = requests;
+    this.syncIndex = syncIndex;
   }
 
   @Override
@@ -49,6 +58,10 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return searchIndex;
   }
 
+  public long getSyncIndex() {
+    return syncIndex;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 3e8cbbfb3b..8b21fa920e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -186,8 +186,9 @@ public class MultiLeaderServerImpl {
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
-      List<IConsensusRequest> requests) {
-    return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
+      long syncIndex, List<IConsensusRequest> requests) {
+    return new IndexedConsensusRequest(
+        ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
   }
 
   /**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 69375872c1..b24be2a1e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -81,8 +81,11 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
           if (batch.getSearchIndex() != currentSearchIndex) {
             statuses.add(
                 impl.getStateMachine()
-                    .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+                    .write(
+                        impl.buildIndexedConsensusRequestForRemoteRequest(
+                            currentSearchIndex, consensusRequests)));
             consensusRequests = new ArrayList<>();
+            currentSearchIndex = batch.getSearchIndex();
           }
           consensusRequests.add(request);
         }
@@ -90,7 +93,9 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         if (!consensusRequests.isEmpty()) {
           statuses.add(
               impl.getStateMachine()
-                  .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+                  .write(
+                      impl.buildIndexedConsensusRequestForRemoteRequest(
+                          currentSearchIndex, consensusRequests)));
         }
       }
       logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 285ad31d74..b56586ee80 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -42,12 +42,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.PriorityQueue;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -58,8 +60,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private DataRegion region;
 
+  private static final int MAX_REQUEST_CACHE_SIZE = 50;
+  private PriorityQueue<InsertNodeWrapper> requestCache;
+
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
+    this.requestCache = new PriorityQueue<>();
   }
 
   @Override
@@ -104,6 +110,38 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
+  private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
+    requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
+    if (requestCache.size() >= MAX_REQUEST_CACHE_SIZE) {
+      return requestCache.poll().getInsertNode();
+    } else {
+      return null;
+    }
+  }
+
+  private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
+    private final long syncIndex;
+    private final InsertNode insertNode;
+
+    public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
+      this.syncIndex = syncIndex;
+      this.insertNode = insertNode;
+    }
+
+    @Override
+    public int compareTo(@NotNull InsertNodeWrapper o) {
+      return Long.compare(syncIndex, o.syncIndex);
+    }
+
+    public long getSyncIndex() {
+      return syncIndex;
+    }
+
+    public InsertNode getInsertNode() {
+      return insertNode;
+    }
+  }
+
   @Override
   public TSStatus write(IConsensusRequest request) {
     PlanNode planNode;
@@ -117,7 +155,13 @@ public class DataRegionStateMachine extends BaseStateMachine {
           innerNode.setSearchIndex(indexedRequest.getSearchIndex());
           insertNodes.add(innerNode);
         }
-        planNode = mergeInsertNodes(insertNodes);
+        planNode =
+            cacheAndGetLatestInsertNode(
+                indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+        // TODO: tmp way to do the test
+        if (planNode == null) {
+          return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        }
       } else {
         planNode = getPlanNode(request);
       }