You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/29 09:25:50 UTC

[iotdb] 01/01: make some change before test

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b8a4973142981756112ad1771a2c1ce8012ff22
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri Jul 29 17:25:37 2022 +0800

    make some change before test
---
 .../common/request/IndexedConsensusRequest.java    | 13 ++++++
 .../iotdb/consensus/config/MultiLeaderConfig.java  |  2 +-
 .../multileader/MultiLeaderServerImpl.java         |  5 ++-
 .../service/MultiLeaderRPCServiceProcessor.java    | 43 ++++++++++----------
 .../statemachine/DataRegionStateMachine.java       | 46 ++++++++++++++++++++++
 .../distribution/WriteFragmentParallelPlanner.java | 14 +++++++
 6 files changed, 100 insertions(+), 23 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..9c5112fa2b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -29,11 +29,20 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   /** we do not need to serialize these two fields as they are useless in other nodes. */
   private final long searchIndex;
 
+  private final long syncIndex;
   private final List<IConsensusRequest> requests;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
     this.requests = requests;
+    this.syncIndex = -1L;
+  }
+
+  public IndexedConsensusRequest(
+      long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
+    this.searchIndex = searchIndex;
+    this.requests = requests;
+    this.syncIndex = syncIndex;
   }
 
   @Override
@@ -49,6 +58,10 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return searchIndex;
   }
 
+  public long getSyncIndex() {
+    return syncIndex;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index e4f66f557b..9d4665e4d4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -249,7 +249,7 @@ public class MultiLeaderConfig {
     public static class Builder {
       private int maxPendingRequestNumPerNode = 200;
       private int maxRequestPerBatch = 40;
-      private int maxPendingBatch = 1;
+      private int maxPendingBatch = 5;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 3e8cbbfb3b..8b21fa920e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -186,8 +186,9 @@ public class MultiLeaderServerImpl {
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
-      List<IConsensusRequest> requests) {
-    return new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
+      long syncIndex, List<IConsensusRequest> requests) {
+    return new IndexedConsensusRequest(
+        ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
   }
 
   /**
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 3fc63c7d99..b24be2a1e6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -70,29 +70,32 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
       List<TSStatus> statuses = new ArrayList<>();
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
-        synchronized (impl.getStateMachine()) {
-          List<IConsensusRequest> consensusRequests = new ArrayList<>();
-          long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
-          for (TLogBatch batch : req.getBatches()) {
-            IConsensusRequest request =
-                batch.isFromWAL()
-                    ? new MultiLeaderConsensusRequest(batch.data)
-                    : new ByteBufferConsensusRequest(batch.data);
-            // merge TLogBatch with same search index into one request
-            if (batch.getSearchIndex() != currentSearchIndex) {
-              statuses.add(
-                  impl.getStateMachine()
-                      .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
-              consensusRequests = new ArrayList<>();
-            }
-            consensusRequests.add(request);
-          }
-          // write last request
-          if (!consensusRequests.isEmpty()) {
+        List<IConsensusRequest> consensusRequests = new ArrayList<>();
+        long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
+        for (TLogBatch batch : req.getBatches()) {
+          IConsensusRequest request =
+              batch.isFromWAL()
+                  ? new MultiLeaderConsensusRequest(batch.data)
+                  : new ByteBufferConsensusRequest(batch.data);
+          // merge TLogBatch with same search index into one request
+          if (batch.getSearchIndex() != currentSearchIndex) {
             statuses.add(
                 impl.getStateMachine()
-                    .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+                    .write(
+                        impl.buildIndexedConsensusRequestForRemoteRequest(
+                            currentSearchIndex, consensusRequests)));
+            consensusRequests = new ArrayList<>();
+            currentSearchIndex = batch.getSearchIndex();
           }
+          consensusRequests.add(request);
+        }
+        // write last request
+        if (!consensusRequests.isEmpty()) {
+          statuses.add(
+              impl.getStateMachine()
+                  .write(
+                      impl.buildIndexedConsensusRequestForRemoteRequest(
+                          currentSearchIndex, consensusRequests)));
         }
       }
       logger.debug("Execute TSyncLogReq for {} with result {}", req.consensusGroupId, statuses);
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 285ad31d74..7863eb7e22 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -42,12 +42,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.PriorityQueue;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -58,8 +60,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
 
   private DataRegion region;
 
+  private static final int MAX_REQUEST_CACHE_SIZE = 50;
+  private final PriorityQueue<InsertNodeWrapper> requestCache;
+
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
+    this.requestCache = new PriorityQueue<>();
   }
 
   @Override
@@ -104,6 +110,40 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
+  private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode insertNode) {
+    synchronized (requestCache) {
+      requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
+      if (requestCache.size() >= MAX_REQUEST_CACHE_SIZE) {
+        return requestCache.poll().getInsertNode();
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
+    private final long syncIndex;
+    private final InsertNode insertNode;
+
+    public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
+      this.syncIndex = syncIndex;
+      this.insertNode = insertNode;
+    }
+
+    @Override
+    public int compareTo(@NotNull InsertNodeWrapper o) {
+      return Long.compare(syncIndex, o.syncIndex);
+    }
+
+    public long getSyncIndex() {
+      return syncIndex;
+    }
+
+    public InsertNode getInsertNode() {
+      return insertNode;
+    }
+  }
+
   @Override
   public TSStatus write(IConsensusRequest request) {
     PlanNode planNode;
@@ -118,6 +158,12 @@ public class DataRegionStateMachine extends BaseStateMachine {
           insertNodes.add(innerNode);
         }
         planNode = mergeInsertNodes(insertNodes);
+        //        planNode = cacheAndGetLatestInsertNode(
+        //                indexedRequest.getSyncIndex(), mergeInsertNodes(insertNodes));
+        // TODO: tmp way to do the test
+        if (planNode == null) {
+          return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        }
       } else {
         planNode = getPlanNode(request);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 4f043a05b9..f800fd49fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
@@ -64,8 +66,20 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               queryContext.getQueryType(),
               queryContext.getTimeOut());
       instance.setDataRegionAndHost(split.getRegionReplicaSet());
+//      instance.setHostDataNode(fakeSelectDataNode(split.getRegionReplicaSet()));
       ret.add(instance);
     }
     return ret;
   }
+
+  private TDataNodeLocation fakeSelectDataNode(TRegionReplicaSet regionReplicaSet) {
+    String[] candidate = new String[] {"172.20.31.41", "172.20.31.42", "172.20.31.43"};
+    int targetIndex = regionReplicaSet.regionId.id % 3;
+    for (TDataNodeLocation location : regionReplicaSet.getDataNodeLocations()) {
+      if (location.internalEndPoint.getIp().equals(candidate[targetIndex])) {
+        return location;
+      }
+    }
+    return null;
+  }
 }