You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/01 10:15:25 UTC

[iotdb] branch ml_0729_test_exp1 created (now 336e861b18)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 336e861b18 change to Async Write exp

This branch includes the following new commits:

     new 336e861b18 change to Async Write exp

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: change to Async Write exp

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 336e861b18738f289ce7197274687452b259fe18
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Aug 1 18:15:08 2022 +0800

    change to Async Write exp
---
 .../org/apache/iotdb/consensus/IStateMachine.java  |   7 ++
 .../service/MultiLeaderRPCServiceProcessor.java    |  23 ++---
 .../consensus/statemachine/BaseStateMachine.java   |  12 +++
 .../statemachine/DataRegionStateMachine.java       | 115 +++++++++++----------
 .../plan/planner/plan/node/write/InsertNode.java   |  10 --
 5 files changed, 88 insertions(+), 79 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index 203b7d2a6b..3b5e6b1796 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -25,6 +25,10 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
+
+import org.apache.thrift.async.AsyncMethodCallback;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -41,6 +45,9 @@ public interface IStateMachine {
 
   void stop();
 
+  void multiLeaderWriteAsync(
+      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler);
+
   /**
    * apply a write-request from user
    *
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index b24be2a1e6..157b8b0207 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
@@ -67,7 +69,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         resultHandler.onComplete(new TSyncLogRes(Collections.singletonList(status)));
         return;
       }
-      List<TSStatus> statuses = new ArrayList<>();
+      List<IndexedConsensusRequest> indexedConsensusRequests = new LinkedList<>();
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
         List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -79,11 +81,9 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
                   : new ByteBufferConsensusRequest(batch.data);
           // merge TLogBatch with same search index into one request
           if (batch.getSearchIndex() != currentSearchIndex) {
-            statuses.add(
-                impl.getStateMachine()
-                    .write(
-                        impl.buildIndexedConsensusRequestForRemoteRequest(
-                            currentSearchIndex, consensusRequests)));
+            indexedConsensusRequests.add(
+                impl.buildIndexedConsensusRequestForRemoteRequest(
+                    currentSearchIndex, consensusRequests));
             consensusRequests = new ArrayList<>();
             currentSearchIndex = batch.getSearchIndex();
           }
@@ -91,15 +91,12 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
         }
         // write last request
         if (!consensusRequests.isEmpty()) {
-          statuses.add(
-              impl.getStateMachine()
-                  .write(
-                      impl.buildIndexedConsensusRequestForRemoteRequest(
-                          currentSearchIndex, consensusRequests)));
+          indexedConsensusRequests.add(
+              impl.buildIndexedConsensusRequestForRemoteRequest(
+                  currentSearchIndex, consensusRequests));
         }
+        impl.getStateMachine().multiLeaderWriteAsync(indexedConsensusRequests, resultHandler);
       }
-      logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
-      resultHandler.onComplete(new TSyncLogRes(statuses));
     } catch (Exception e) {
       resultHandler.onError(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 32333f26e9..245a44adc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -22,19 +22,31 @@ package org.apache.iotdb.db.consensus.statemachine;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.wal.buffer.WALEntry;
 
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.NotSupportedException;
+
+import java.util.List;
+
 public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
 
+  public void multiLeaderWriteAsync(
+      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
+    throw new NotSupportedException();
+  }
+
   protected FragmentInstance getFragmentInstance(IConsensusRequest request) {
     FragmentInstance instance;
     if (request instanceof ByteBufferConsensusRequest) {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 0fd9dc5a76..32847e2eb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -44,13 +44,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -65,11 +66,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
-  private final PriorityQueue<InsertNode> requestCache;
+  private final PriorityQueue<InsertNodeWrapper> requestCache;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.requestCache = new PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
+    this.requestCache = new PriorityQueue<>();
   }
 
   @Override
@@ -114,56 +115,31 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
-  private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
-    insertNode.setSyncIndex(syncIndex);
+  private InsertNodeWrapper cacheAndGetLatestInsertNode(
+      long syncIndex,
+      List<InsertNode> insertNodes,
+      AsyncMethodCallback<TSyncLogRes> resultHandler) {
     synchronized (requestCache) {
-      requestCache.add(insertNode);
-      requestCache.notifyAll();
-      while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
-          && requestCache.peek().getSyncIndex() == syncIndex)) {
-        try {
-          requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
+      requestCache.add(new InsertNodeWrapper(syncIndex, insertNodes, resultHandler));
+      if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
+        return requestCache.poll();
       }
-      requestCache.notifyAll();
-      InsertNode nextNode = requestCache.poll();
-      //      logger.info("queue size {}, syncIndex = {}", requestCache.size(),
-      // nextNode.getSyncIndex());
-      return nextNode;
+      return null;
     }
   }
 
-  //  private InsertNode cacheAndGetLatestInsertNode2(long syncIndex, InsertNode insertNode) {
-  //    insertNode.setSyncIndex(syncIndex);
-  //    synchronized (requestCache) {
-  //      requestCache.add(insertNode);
-  //      if (requestCache.size() == MAX_REQUEST_CACHE_SIZE) {
-  //        if (insertNode == requestCache.peek()) {
-  //          return requestCache.poll();
-  //        } else {
-  //          requestCache.poll().notify();
-  //        }
-  //      }
-  //    }
-  //
-  //    synchronized (requestCache) {
-  //      try {
-  //        insertNode.wait(CACHE_WINDOW_TIME_IN_MS);
-  //      } catch (InterruptedException e) {
-  //        Thread.currentThread().interrupt();
-  //      }
-  //    }
-  //  }
-
   private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
     private final long syncIndex;
-    private final InsertNode insertNode;
+    private final List<InsertNode> insertNodes;
+    private final AsyncMethodCallback<TSyncLogRes> resultHandler;
 
-    public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
+    public InsertNodeWrapper(
+        long syncIndex,
+        List<InsertNode> insertNode,
+        AsyncMethodCallback<TSyncLogRes> resultHandler) {
       this.syncIndex = syncIndex;
-      this.insertNode = insertNode;
+      this.insertNodes = insertNode;
+      this.resultHandler = resultHandler;
     }
 
     @Override
@@ -175,8 +151,43 @@ public class DataRegionStateMachine extends BaseStateMachine {
       return syncIndex;
     }
 
-    public InsertNode getInsertNode() {
-      return insertNode;
+    public List<InsertNode> getInsertNodes() {
+      return insertNodes;
+    }
+
+    public AsyncMethodCallback<TSyncLogRes> getResultHandler() {
+      return resultHandler;
+    }
+  }
+
+  public void multiLeaderWriteAsync(
+      List<IndexedConsensusRequest> requests, AsyncMethodCallback<TSyncLogRes> resultHandler) {
+    List<TSStatus> statuses = new LinkedList<>();
+    try {
+      List<InsertNode> insertNodesInAllRequests = new LinkedList<>();
+      for (IndexedConsensusRequest indexedRequest : requests) {
+        List<InsertNode> insertNodesInOneRequest =
+            new ArrayList<>(indexedRequest.getRequests().size());
+        for (IConsensusRequest req : indexedRequest.getRequests()) {
+          // PlanNode in IndexedConsensusRequest should always be InsertNode
+          InsertNode innerNode = (InsertNode) getPlanNode(req);
+          innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+          insertNodesInOneRequest.add(innerNode);
+        }
+        insertNodesInAllRequests.add(mergeInsertNodes(insertNodesInOneRequest));
+      }
+      InsertNodeWrapper insertNodeWrapper =
+          cacheAndGetLatestInsertNode(
+              requests.get(0).getSyncIndex(), insertNodesInAllRequests, resultHandler);
+      if (insertNodeWrapper != null) {
+        for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+          statuses.add(write(insertNode));
+        }
+        insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses));
+      }
+    } catch (IllegalArgumentException e) {
+      logger.error(e.getMessage(), e);
+      resultHandler.onError(e);
     }
   }
 
@@ -193,15 +204,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
           innerNode.setSearchIndex(indexedRequest.getSearchIndex());
           insertNodes.add(innerNode);
         }
-        if (indexedRequest.getSearchIndex() == ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
-          long cacheRequestStartTime = System.nanoTime();
-          planNode =
-              cacheAndGetLatestInsertNode(
-                  indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
-          StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime());
-        } else {
-          planNode = mergeInsertNodes(insertNodes);
-        }
+        planNode = mergeInsertNodes(insertNodes);
       } else {
         planNode = getPlanNode(request);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 787dfbf9ad..f5b5bddb52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,8 +77,6 @@ public abstract class InsertNode extends WritePlanNode {
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
-  protected long syncIndex = NO_CONSENSUS_INDEX;
-
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
@@ -155,14 +153,6 @@ public abstract class InsertNode extends WritePlanNode {
     return searchIndex;
   }
 
-  public void setSyncIndex(long syncIndex) {
-    this.syncIndex = syncIndex;
-  }
-
-  public long getSyncIndex() {
-    return syncIndex;
-  }
-
   /** Search index should start from 1 */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;