You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/09 08:31:00 UTC

[iotdb] branch ml_0808_test_exp1_parallel updated: refine codes and add more comments

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by this push:
     new e6d2cf0bc9 refine codes and add more comments
e6d2cf0bc9 is described below

commit e6d2cf0bc9d1156d934638b3e8b4e310e85892b9
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 9 16:30:47 2022 +0800

    refine codes and add more comments
---
 .../iotdb/consensus/config/MultiLeaderConfig.java  |  4 +-
 .../multileader/MultiLeaderServerImpl.java         |  7 ++-
 .../multileader/logdispatcher/LogDispatcher.java   | 14 ++---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../consensus/statemachine/BaseStateMachine.java   | 28 ++++------
 .../statemachine/DataRegionStateMachine.java       | 23 ++++----
 .../db/mpp/plan/execution/QueryExecution.java      | 61 ++++++++++++----------
 .../distribution/WriteFragmentParallelPlanner.java | 14 -----
 .../plan/planner/plan/node/write/InsertNode.java   | 10 ----
 9 files changed, 70 insertions(+), 93 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 0355485847..9b4a72a4f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -132,7 +132,7 @@ public class MultiLeaderConfig {
 
     public static class Builder {
       private int rpcSelectorThreadNum = 1;
-      private int rpcMinConcurrentClientNum = 16;
+      private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors();
       private int rpcMaxConcurrentClientNum = 65535;
       private int thriftServerAwaitTimeForStopService = 60;
       private boolean isRpcThriftCompressionEnabled = false;
@@ -249,6 +249,8 @@ public class MultiLeaderConfig {
     public static class Builder {
       private int maxPendingRequestNumPerNode = 600;
       private int maxRequestPerBatch = 30;
+      // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
+      // in DataRegionStateMachine
       private int maxPendingBatch = 5;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 096cc160a0..b0cfba2718 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -121,8 +121,13 @@ public class MultiLeaderServerImpl {
       }
       // TODO wal and memtable
       TSStatus result = stateMachine.write(indexedConsensusRequest);
-      long offerStartTime = System.nanoTime();
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // The index is used when constructing batch in LogDispatcher. If its value
+        // increases but the corresponding request does not exist or is not put into
+        // the queue, the dispatcher will try to find the request in WAL. This behavior
+        // is not expected and will slow down the preparation speed for batch.
+        // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
+        // in one transaction.
         synchronized (index) {
           logDispatcher.offer(indexedConsensusRequest);
           index.incrementAndGet();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 5f29665346..8938d8d03f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -145,7 +145,6 @@ public class LogDispatcher {
 
     private ConsensusReqReader.ReqIterator walEntryiterator;
     private long iteratorIndex = 1;
-    private int queueCount = 0;
 
     public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
@@ -244,7 +243,6 @@ public class LogDispatcher {
               bufferedRequest,
               config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
           maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
-          //            impl.getIndexObject().notifyAll();
         }
         // remove all request that searchIndex < startIndex
         Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
@@ -260,7 +258,8 @@ public class LogDispatcher {
       // This condition will be executed in several scenarios:
       // 1. restart
       // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
-      // up.
+      // up. To prevent inconsistency here, we use the synchronized logic when calculate value of
+      // `maxIndexWhenBufferedRequestEmpty`
       if (bufferedRequest.isEmpty()) {
         endIndex = constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches);
         batch = new PendingBatch(startIndex, endIndex, logBatches);
@@ -270,8 +269,7 @@ public class LogDispatcher {
         // Notice that prev searchIndex >= startIndex
         Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
-        // Prevents gap between logs. For example, some requests are not written into the queue
-        // when
+        // Prevents gap between logs. For example, some requests are not written into the queue when
         // the queue is full. In this case, requests need to be loaded from the WAL
         endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
         if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
@@ -353,11 +351,6 @@ public class LogDispatcher {
           && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
         logger.debug("construct from WAL for one Entry, index : {}", currentIndex);
         try {
-          logger.debug(
-              "{} : before wait pendingRequest Size: {}, bufferedRequest size: {}",
-              impl.getThisNode().getGroupId(),
-              pendingRequest.size(),
-              bufferedRequest.size());
           walEntryiterator.waitForNextReady();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
@@ -368,7 +361,6 @@ public class LogDispatcher {
         iteratorIndex = currentIndex;
         for (IConsensusRequest innerRequest : data.getRequests()) {
           logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), currentIndex, true));
-          logger.info("read one entry from WAL for dispatching: {}", data.getSearchIndex());
         }
         if (currentIndex == maxIndex - 1) {
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6d54339800..cb2583e4d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -118,7 +118,7 @@ public class IoTDBConfig {
   private int rpcSelectorThreadNum = 1;
 
   /** Min concurrent client number */
-  private int rpcMinConcurrentClientNum = 16;
+  private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors();
 
   /** Max concurrent client number */
   private int rpcMaxConcurrentClientNum = 65535;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 3bd74bd91a..32333f26e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.consensus.statemachine;
 
-import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -50,22 +49,17 @@ public abstract class BaseStateMachine implements IStateMachine, IStateMachine.E
   }
 
   protected PlanNode getPlanNode(IConsensusRequest request) {
-    long startTime = System.nanoTime();
-    try {
-      PlanNode node;
-      if (request instanceof ByteBufferConsensusRequest) {
-        node = PlanNodeType.deserialize(request.serializeToByteBuffer());
-      } else if (request instanceof MultiLeaderConsensusRequest) {
-        node = WALEntry.deserializeInsertNode(request.serializeToByteBuffer());
-      } else if (request instanceof PlanNode) {
-        node = (PlanNode) request;
-      } else {
-        logger.error("Unexpected IConsensusRequest : {}", request);
-        throw new IllegalArgumentException("Unexpected IConsensusRequest!");
-      }
-      return node;
-    } finally {
-      StepTracker.trace("deserializePlanNode", startTime, System.nanoTime());
+    PlanNode node;
+    if (request instanceof ByteBufferConsensusRequest) {
+      node = PlanNodeType.deserialize(request.serializeToByteBuffer());
+    } else if (request instanceof MultiLeaderConsensusRequest) {
+      node = WALEntry.deserializeInsertNode(request.serializeToByteBuffer());
+    } else if (request instanceof PlanNode) {
+      node = (PlanNode) request;
+    } else {
+      logger.error("Unexpected IConsensusRequest : {}", request);
+      throw new IllegalArgumentException("Unexpected IConsensusRequest!");
     }
+    return node;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 4b7a416740..1af648d335 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -44,7 +43,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDevic
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,6 +120,11 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
   }
 
+  /**
+   * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
+   * in follower the same as the leader. And besides order insurance, we can make the
+   * deserialization of PlanNode to be concurrent
+   */
   private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
     queueLock.lock();
     try {
@@ -133,11 +136,18 @@ public class DataRegionStateMachine extends BaseStateMachine {
         queueSortCondition.signalAll();
       }
       while (true) {
+        // If current InsertNode is the next target InsertNode, write it
         if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
           requestCache.remove(insertNodeWrapper);
           nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
           break;
         }
+        // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+        // the peek request. This is used to keep the whole write correct when nextSyncIndex
+        // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+        // There are some cases that nextSyncIndex is not set:
+        //   1. When the system was just started
+        //   2. When some exception occurs during SyncLog
         if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
             && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
           requestCache.remove();
@@ -199,7 +209,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
     }
 
     @Override
-    public int compareTo(@NotNull InsertNodeWrapper o) {
+    public int compareTo(InsertNodeWrapper o) {
       return Long.compare(startSyncIndex, o.startSyncIndex);
     }
 
@@ -314,12 +324,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
   }
 
   protected TSStatus write(PlanNode planNode) {
-    long startTime = System.nanoTime();
-    try {
-      return planNode.accept(new DataExecutionVisitor(), region);
-    } finally {
-      StepTracker.trace("StateMachineWrite", startTime, System.nanoTime());
-    }
+    return planNode.accept(new DataExecutionVisitor(), region);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 7c2bc8c9bd..842352a7b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
@@ -52,6 +53,9 @@ import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -63,6 +67,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
@@ -462,35 +467,33 @@ public class QueryExecution implements IQueryExecution {
     }
 
     // collect redirect info to client for writing
-    //    if (analysis.getStatement() instanceof InsertBaseStatement) {
-    //      InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
-    //      List<TEndPoint> redirectNodeList;
-    //      if (config.isClusterMode()) {
-    //        redirectNodeList =
-    // insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
-    //      } else {
-    //        redirectNodeList = Collections.emptyList();
-    //      }
-    //      if (insertStatement instanceof InsertRowsStatement
-    //          || insertStatement instanceof InsertMultiTabletsStatement) {
-    //        // multiple devices
-    //        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
-    //          List<TSStatus> subStatus = new ArrayList<>();
-    //          tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-    //          for (TEndPoint endPoint : redirectNodeList) {
-    //            subStatus.add(
-    //
-    // StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
-    //          }
-    //          tsstatus.setSubStatus(subStatus);
-    //        }
-    //      } else {
-    //        // single device
-    //        if (config.isClusterMode()) {
-    //          tsstatus.setRedirectNode(redirectNodeList.get(0));
-    //        }
-    //      }
-    //    }
+    if (analysis.getStatement() instanceof InsertBaseStatement) {
+      InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement();
+      List<TEndPoint> redirectNodeList;
+      if (config.isClusterMode()) {
+        redirectNodeList = insertStatement.collectRedirectInfo(analysis.getDataPartitionInfo());
+      } else {
+        redirectNodeList = Collections.emptyList();
+      }
+      if (insertStatement instanceof InsertRowsStatement
+          || insertStatement instanceof InsertMultiTabletsStatement) {
+        // multiple devices
+        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+          List<TSStatus> subStatus = new ArrayList<>();
+          tsstatus.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+          for (TEndPoint endPoint : redirectNodeList) {
+            subStatus.add(
+                StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(endPoint));
+          }
+          tsstatus.setSubStatus(subStatus);
+        }
+      } else {
+        // single device
+        if (config.isClusterMode()) {
+          tsstatus.setRedirectNode(redirectNodeList.get(0));
+        }
+      }
+    }
 
     return new ExecutionResult(context.getQueryId(), tsstatus);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index cfc17b45cb..4f043a05b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
@@ -66,20 +64,8 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               queryContext.getQueryType(),
               queryContext.getTimeOut());
       instance.setDataRegionAndHost(split.getRegionReplicaSet());
-      instance.setHostDataNode(fakeSelectDataNode(split.getRegionReplicaSet()));
       ret.add(instance);
     }
     return ret;
   }
-
-  private TDataNodeLocation fakeSelectDataNode(TRegionReplicaSet regionReplicaSet) {
-    String[] candidate = new String[] {"172.20.31.41", "172.20.31.42", "172.20.31.43"};
-    int targetIndex = regionReplicaSet.regionId.id % 3;
-    for (TDataNodeLocation location : regionReplicaSet.getDataNodeLocations()) {
-      if (location.internalEndPoint.getIp().equals(candidate[targetIndex])) {
-        return location;
-      }
-    }
-    return null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index b1420f9499..d9808da701 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -77,8 +77,6 @@ public abstract class InsertNode extends WritePlanNode {
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
-  protected long syncIndex = NO_CONSENSUS_INDEX;
-
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
@@ -155,14 +153,6 @@ public abstract class InsertNode extends WritePlanNode {
     return searchIndex;
   }
 
-  public void setSyncIndex(long syncIndex) {
-    this.syncIndex = syncIndex;
-  }
-
-  public long getSyncIndex() {
-    return syncIndex;
-  }
-
   /** Search index should start from 1 */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;