You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/09 08:07:20 UTC

[iotdb] 01/01: Implement all fragment instance sharing dop in one dataNode

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch FIDop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3023b793babd089c5c9173ca65d20bd197eb5c3b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 9 16:06:57 2023 +0800

    Implement all fragment instance sharing dop in one dataNode
---
 .../fragment/FragmentInstanceContext.java          | 42 +++++++++++++++++++---
 .../fragment/FragmentInstanceManager.java          |  8 ++---
 .../plan/planner/LocalExecutionPlanContext.java    |  2 ++
 .../SimpleFragmentParallelPlanner.java             | 25 +++++++++----
 .../db/mpp/plan/planner/plan/FragmentInstance.java | 12 +++++++
 5 files changed, 72 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index f810bc1ddb..b299dd92ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -75,6 +77,8 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
+  private int degreeOfParallelism = 1;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -84,9 +88,12 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      FragmentInstance fragmentInstance) {
     FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(id, stateMachine, sessionInfo);
+        new FragmentInstanceContext(id, stateMachine, fragmentInstance.getSessionInfo());
+    instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -95,11 +102,16 @@ public class FragmentInstanceContext extends QueryContext {
   public static FragmentInstanceContext createFragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
-      SessionInfo sessionInfo,
       IDataRegionForQuery dataRegion,
-      Filter timeFilter) {
+      FragmentInstance fragmentInstance) {
     FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(id, stateMachine, sessionInfo, dataRegion, timeFilter);
+        new FragmentInstanceContext(
+            id,
+            stateMachine,
+            fragmentInstance.getSessionInfo(),
+            dataRegion,
+            fragmentInstance.getTimeFilter());
+    instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -368,4 +380,24 @@ public class FragmentInstanceContext extends QueryContext {
     sourcePaths = null;
     sharedQueryDataSource = null;
   }
+
+  private static int calculateDegreeOfParallelism(FragmentInstance fragmentInstance) {
+    int systemDop = IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
+    int instanceNumInDataNode = fragmentInstance.getInstanceNumInDataNode();
+    if (instanceNumInDataNode >= systemDop) {
+      return 1;
+    } else if (fragmentInstance.isRoot()) {
+      return systemDop / instanceNumInDataNode + systemDop % instanceNumInDataNode;
+    } else {
+      return systemDop / instanceNumInDataNode;
+    }
+  }
+
+  public void setDegreeOfParallelism(int degreeOfParallelism) {
+    this.degreeOfParallelism = degreeOfParallelism;
+  }
+
+  public int getDegreeOfParallelism() {
+    return degreeOfParallelism;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..47fbd43273 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -123,11 +123,7 @@ public class FragmentInstanceManager {
                         instanceId,
                         fragmentInstanceId ->
                             createFragmentInstanceContext(
-                                fragmentInstanceId,
-                                stateMachine,
-                                instance.getSessionInfo(),
-                                dataRegion,
-                                instance.getTimeFilter()));
+                                fragmentInstanceId, stateMachine, dataRegion, instance));
 
                 try {
                   List<PipelineDriverFactory> driverFactories =
@@ -190,7 +186,7 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId ->
                           createFragmentInstanceContext(
-                              fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                              fragmentInstanceId, stateMachine, instance));
 
               try {
                 List<PipelineDriverFactory> driverFactories =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..f6e4eb4d47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -89,6 +89,7 @@ public class LocalExecutionPlanContext {
     this.nextPipelineId = new AtomicInteger(0);
     this.driverContext = new DataDriverContext(instanceContext, getNextPipelineId());
     this.pipelineDriverFactories = new ArrayList<>();
+    this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
   }
 
   // For creating subContext, differ from parent context mainly in driver context
@@ -119,6 +120,7 @@ public class LocalExecutionPlanContext {
     this.dataRegionTTL = Long.MAX_VALUE;
     this.driverContext =
         new SchemaDriverContext(instanceContext, schemaRegion, getNextPipelineId());
+    this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
     this.pipelineDriverFactories = new ArrayList<>();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 6361a91f98..9ccf34bece 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -57,15 +57,17 @@ import java.util.Map;
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
 
-  private SubPlan subPlan;
-  private Analysis analysis;
-  private MPPQueryContext queryContext;
+  private final SubPlan subPlan;
+  private final Analysis analysis;
+  private final MPPQueryContext queryContext;
 
   // Record all the FragmentInstances belonged to same PlanFragment
-  Map<PlanFragmentId, FragmentInstance> instanceMap;
+  private final Map<PlanFragmentId, FragmentInstance> instanceMap;
   // Record which PlanFragment the PlanNode belongs
-  Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
-  List<FragmentInstance> fragmentInstanceList;
+  private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
+  private final List<FragmentInstance> fragmentInstanceList;
+  // Record which dataNode each instance allocated to
+  private final Map<TDataNodeLocation, List<FragmentInstance>> locationToInstanceMap;
 
   public SimpleFragmentParallelPlanner(
       SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
@@ -75,6 +77,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     this.instanceMap = new HashMap<>();
     this.planNodeMap = new HashMap<>();
     this.fragmentInstanceList = new ArrayList<>();
+    this.locationToInstanceMap = new HashMap<>();
   }
 
   @Override
@@ -90,6 +93,13 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
       recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
       produceFragmentInstance(fragment);
     }
+    for (Map.Entry<TDataNodeLocation, List<FragmentInstance>> entry :
+        locationToInstanceMap.entrySet()) {
+      List<FragmentInstance> fragmentInstances = entry.getValue();
+      int instanceNumInDataNode = fragmentInstances.size();
+      fragmentInstances.forEach(
+          instance -> instance.setInstanceNumInDataNode(instanceNumInDataNode));
+    }
   }
 
   private void produceFragmentInstance(PlanFragment fragment) {
@@ -137,6 +147,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
     fragmentInstanceList.add(fragmentInstance);
+    locationToInstanceMap
+        .computeIfAbsent(fragmentInstance.getHostDataNode(), x -> new ArrayList<>())
+        .add(fragmentInstance);
   }
 
   private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 97bd7e1748..310b3f7c23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -68,6 +68,8 @@ public class FragmentInstance implements IConsensusRequest {
 
   private final SessionInfo sessionInfo;
 
+  private int instanceNumInDataNode;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -126,6 +128,14 @@ public class FragmentInstance implements IConsensusRequest {
     return executorType.getRegionReplicaSet();
   }
 
+  public void setInstanceNumInDataNode(int instanceNumInDataNode) {
+    this.instanceNumInDataNode = instanceNumInDataNode;
+  }
+
+  public int getInstanceNumInDataNode() {
+    return instanceNumInDataNode;
+  }
+
   public PlanFragment getFragment() {
     return fragment;
   }
@@ -183,6 +193,7 @@ public class FragmentInstance implements IConsensusRequest {
     boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
     fragmentInstance.hostDataNode =
         hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
+    fragmentInstance.setInstanceNumInDataNode(ReadWriteIOUtils.readInt(buffer));
     return fragmentInstance;
   }
 
@@ -205,6 +216,7 @@ public class FragmentInstance implements IConsensusRequest {
       if (hostDataNode != null) {
         ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream);
       }
+      ReadWriteIOUtils.write(instanceNumInDataNode, outputStream);
       return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
     } catch (IOException e) {
       logger.error("Unexpected error occurs when serializing this FragmentInstance.", e);