You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/01 10:15:26 UTC

[iotdb] 01/01: change to Async Write exp

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 336e861b18738f289ce7197274687452b259fe18
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 1 18:15:08 2022 +0800

    change to Async Write exp
---
 .../org/apache/iotdb/consensus/IStateMachine.java  |   7 ++
 .../service/MultiLeaderRPCServiceProcessor.java    |  23 ++---
 .../consensus/statemachine/BaseStateMachine.java   |  12 +++
 .../statemachine/DataRegionStateMachine.java       | 115 +++++++++++----------
 .../plan/planner/plan/node/write/InsertNode.java   |  10 --
 5 files changed, 88 insertions(+), 79 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 203b7d2a6b..3b5e6b1796 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -25,6 +25,10 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+
+import org.apache.thrift.async.AsyncMethodCallback;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -41,6 +45,9 @@ public interface IStateMachine {
 
   void stop();
 
+  void multiLeaderWriteAsync(
+      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler);
+
   /**
    * apply a write-request from user
    *
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index b24be2a1e6..157b8b0207 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -67,7 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
         return;
       }
-      List<TSStatus> statuses = new ArrayList<>();
+      List<IndexedConsensusRequest> indexedConsensusRequests = new LinkedList<>();
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
         List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -79,11 +81,9 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
                   : new ByteBufferConsensusRequest(batch.data);
           // merge TLogBatch with same search index into one request
           if (batch.getSearchIndex() != currentSearchIndex) {
-            statuses.add(
-                impl.getStateMachine()
-                    .write(
-                        impl.buildIndexedConsensusRequestForRemoteRequest(
-                            currentSearchIndex, consensusRequests)));
+            indexedConsensusRequests.add(
+                impl.buildIndexedConsensusRequestForRemoteRequest(
+                    currentSearchIndex, consensusRequests));
             consensusRequests = new ArrayList<>();
             currentSearchIndex = batch.getSearchIndex();
           }
@@ -91,15 +91,12 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         }
         // write last request
         if (!consensusRequests.isEmpty()) {
-          statuses.add(
-              impl.getStateMachine()
-                  .write(
-                      impl.buildIndexedConsensusRequestForRemoteRequest(
-                          currentSearchIndex, consensusRequests)));
+          indexedConsensusRequests.add(
+              impl.buildIndexedConsensusRequestForRemoteRequest(
+                  currentSearchIndex, consensusRequests));
         }
+        impl.getStateMachine().multiLeaderWriteAsync(indexedConsensusRequests, resultHandler);
       }
-      logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
-      resultHandler.onComplete(new TSyncLogRes(statuses));
     } catch (Exception e) {
       resultHandler.onError(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 32333f26e9..245a44adc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,19 +22,31 @@ package org.apache.iotdb.db.consensus.statemachine;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.NotSupportedException;
+
+import java.util.List;
+
 public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
 
+  public void multiLeaderWriteAsync(
+      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
+    throw new NotSupportedException();
+  }
+
   protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 0fd9dc5a76..32847e2eb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -44,13 +44,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -65,11 +66,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
-  private final PriorityQueue<InsertNode> requestCache;
+  private final PriorityQueue<InsertNodeWrapper> requestCache;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
+    this.requestCache = new PriorityQueue<>();
   }
 
   @Override
@@ -114,56 +115,31 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
-  private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
-    insertNode.setSyncIndex(syncIndex);
+  private InsertNodeWrapper cacheAndGetLatestInsertNode(
+      long syncIndex,
+      List<InsertNode> insertNodes,
+      AsyncMethodCallback<TSyncLogRes> resultHandler) {
     synchronized (requestCache) {
-      requestCache.add(insertNode);
-      requestCache.notifyAll();
-      while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
-          && requestCache.peek().getSyncIndex() == syncIndex)) {
-        try {
-          requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
+      requestCache.add(new InsertNodeWrapper(syncIndex, insertNodes, resultHandler));
+      if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
+        return requestCache.poll();
       }
-      requestCache.notifyAll();
-      InsertNode nextNode = requestCache.poll();
-      //      logger.info("queue size {}, syncIndex = {}", requestCache.size(),
-      // nextNode.getSyncIndex());
-      return nextNode;
+      return null;
     }
   }
 
-  //  private InsertNode cacheAndGetLatestInsertNode2(long syncIndex, InsertNode insertNode) {
-  //    insertNode.setSyncIndex(syncIndex);
-  //    synchronized (requestCache) {
-  //      requestCache.add(insertNode);
-  //      if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
-  //        if (insertNode == requestCache.peek()) {
-  //          return requestCache.poll();
-  //        } else {
-  //          requestCache.poll().notify();
-  //        }
-  //      }
-  //    }
-  //
-  //    synchronized (requestCache) {
-  //      try {
-  //        insertNode.wait(CACHE_WINDOW_TIME_IN_MS);
-  //      } catch (InterruptedException e) {
-  //        Thread.currentThread().interrupt();
-  //      }
-  //    }
-  //  }
-
   private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
     private final long syncIndex;
-    private final InsertNode insertNode;
+    private final List<InsertNode> insertNodes;
+    private final AsyncMethodCallback<TSyncLogRes> resultHandler;
 
-    public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
+    public InsertNodeWrapper(
+        long syncIndex,
+        List<InsertNode> insertNode,
+        AsyncMethodCallback<TSyncLogRes> resultHandler) {
       this.syncIndex = syncIndex;
-      this.insertNode = insertNode;
+      this.insertNodes = insertNode;
+      this.resultHandler = resultHandler;
     }
 
     @Override
@@ -175,8 +151,43 @@ public class DataRegionStateMachine extends BaseStateMachine {
       return syncIndex;
     }
 
-    public InsertNode getInsertNode() {
-      return insertNode;
+    public List<InsertNode> getInsertNodes() {
+      return insertNodes;
+    }
+
+    public AsyncMethodCallback<TSyncLogRes> getResultHandler() {
+      return resultHandler;
+    }
+  }
+
+  public void multiLeaderWriteAsync(
+      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
+    List<TSStatus> statuses = new LinkedList<>();
+    try {
+      List<InsertNode> insertNodesInAllRequests = new LinkedList<>();
+      for (IndexedConsensusRequest indexedRequest : requests) {
+        List<InsertNode> insertNodesInOneRequest =
+            new ArrayList<>(indexedRequest.getRequests().size());
+        for (IConsensusRequest req : indexedRequest.getRequests()) {
+          // PlanNode in IndexedConsensusRequest should always be InsertNode
+          InsertNode innerNode = (InsertNode) getPlanNode(req);
+          innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+          insertNodesInOneRequest.add(innerNode);
+        }
+        insertNodesInAllRequests.add(mergeInsertNodes(insertNodesInOneRequest));
+      }
+      InsertNodeWrapper insertNodeWrapper =
+          cacheAndGetLatestInsertNode(
+              requests.get(0).getSyncIndex(), insertNodesInAllRequests, resultHandler);
+      if (insertNodeWrapper != null) {
+        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+          statuses.add(write(insertNode));
+        }
+        insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
+      }
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      resultHandler.onError(e);
     }
   }
 
@@ -193,15 +204,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
           innerNode.setSearchIndex(indexedRequest.getSearchIndex());
           insertNodes.add(innerNode);
         }
-        if (indexedRequest.getSearchIndex() == ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
-          long cacheRequestStartTime = System.nanoTime();
-          planNode =
-              cacheAndGetLatestInsertNode(
-                  indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
-          StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime());
-        } else {
-          planNode = mergeInsertNodes(insertNodes);
-        }
+        planNode = mergeInsertNodes(insertNodes);
       } else {
         planNode = getPlanNode(request);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 787dfbf9ad..f5b5bddb52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,8 +77,6 @@ public abstract class InsertNode extends WritePlanNode {
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
-  protected long syncIndex = NO_CONSENSUS_INDEX;
-
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
@@ -155,14 +153,6 @@ public abstract class InsertNode extends WritePlanNode {
     return searchIndex;
   }
 
-  public void setSyncIndex(long syncIndex) {
-    this.syncIndex = syncIndex;
-  }
-
-  public long getSyncIndex() {
-    return syncIndex;
-  }
-
   /** Search index should start from 1 */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;