You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/09 08:07:19 UTC

[iotdb] branch FIDop created (now 3023b793ba)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch FIDop
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 3023b793ba Implement all fragment instance sharing dop in one dataNode

This branch includes the following new commits:

     new 3023b793ba Implement all fragment instance sharing dop in one dataNode

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Implement all fragment instance sharing dop in one dataNode

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch FIDop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3023b793babd089c5c9173ca65d20bd197eb5c3b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 9 16:06:57 2023 +0800

    Implement all fragment instance sharing dop in one dataNode
---
 .../fragment/FragmentInstanceContext.java          | 42 +++++++++++++++++++---
 .../fragment/FragmentInstanceManager.java          |  8 ++---
 .../plan/planner/LocalExecutionPlanContext.java    |  2 ++
 .../SimpleFragmentParallelPlanner.java             | 25 +++++++++----
 .../db/mpp/plan/planner/plan/FragmentInstance.java | 12 +++++++
 5 files changed, 72 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index f810bc1ddb..b299dd92ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -75,6 +77,8 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
+  private int degreeOfParallelism = 1;
+
   //    private final GcMonitor gcMonitor;
   //    private final AtomicLong startNanos = new AtomicLong();
   //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -84,9 +88,12 @@ public class FragmentInstanceContext extends QueryContext {
   //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
 
   public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+      FragmentInstanceId id,
+      FragmentInstanceStateMachine stateMachine,
+      FragmentInstance fragmentInstance) {
     FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(id, stateMachine, sessionInfo);
+        new FragmentInstanceContext(id, stateMachine, fragmentInstance.getSessionInfo());
+    instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -95,11 +102,16 @@ public class FragmentInstanceContext extends QueryContext {
   public static FragmentInstanceContext createFragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
-      SessionInfo sessionInfo,
       IDataRegionForQuery dataRegion,
-      Filter timeFilter) {
+      FragmentInstance fragmentInstance) {
     FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(id, stateMachine, sessionInfo, dataRegion, timeFilter);
+        new FragmentInstanceContext(
+            id,
+            stateMachine,
+            fragmentInstance.getSessionInfo(),
+            dataRegion,
+            fragmentInstance.getTimeFilter());
+    instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
     instanceContext.initialize();
     instanceContext.start();
     return instanceContext;
@@ -368,4 +380,24 @@ public class FragmentInstanceContext extends QueryContext {
     sourcePaths = null;
     sharedQueryDataSource = null;
   }
+
+  private static int calculateDegreeOfParallelism(FragmentInstance fragmentInstance) {
+    int systemDop = IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
+    int instanceNumInDataNode = fragmentInstance.getInstanceNumInDataNode();
+    if (instanceNumInDataNode >= systemDop) {
+      return 1;
+    } else if (fragmentInstance.isRoot()) {
+      return systemDop / instanceNumInDataNode + systemDop % instanceNumInDataNode;
+    } else {
+      return systemDop / instanceNumInDataNode;
+    }
+  }
+
+  public void setDegreeOfParallelism(int degreeOfParallelism) {
+    this.degreeOfParallelism = degreeOfParallelism;
+  }
+
+  public int getDegreeOfParallelism() {
+    return degreeOfParallelism;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..47fbd43273 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -123,11 +123,7 @@ public class FragmentInstanceManager {
                         instanceId,
                         fragmentInstanceId ->
                             createFragmentInstanceContext(
-                                fragmentInstanceId,
-                                stateMachine,
-                                instance.getSessionInfo(),
-                                dataRegion,
-                                instance.getTimeFilter()));
+                                fragmentInstanceId, stateMachine, dataRegion, instance));
 
                 try {
                   List<PipelineDriverFactory> driverFactories =
@@ -190,7 +186,7 @@ public class FragmentInstanceManager {
                       instanceId,
                       fragmentInstanceId ->
                           createFragmentInstanceContext(
-                              fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+                              fragmentInstanceId, stateMachine, instance));
 
               try {
                 List<PipelineDriverFactory> driverFactories =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..f6e4eb4d47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -89,6 +89,7 @@ public class LocalExecutionPlanContext {
     this.nextPipelineId = new AtomicInteger(0);
     this.driverContext = new DataDriverContext(instanceContext, getNextPipelineId());
     this.pipelineDriverFactories = new ArrayList<>();
+    this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
   }
 
   // For creating subContext, differ from parent context mainly in driver context
@@ -119,6 +120,7 @@ public class LocalExecutionPlanContext {
     this.dataRegionTTL = Long.MAX_VALUE;
     this.driverContext =
         new SchemaDriverContext(instanceContext, schemaRegion, getNextPipelineId());
+    this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
     this.pipelineDriverFactories = new ArrayList<>();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 6361a91f98..9ccf34bece 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -57,15 +57,17 @@ import java.util.Map;
 public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
 
-  private SubPlan subPlan;
-  private Analysis analysis;
-  private MPPQueryContext queryContext;
+  private final SubPlan subPlan;
+  private final Analysis analysis;
+  private final MPPQueryContext queryContext;
 
   // Record all the FragmentInstances belonged to same PlanFragment
-  Map<PlanFragmentId, FragmentInstance> instanceMap;
+  private final Map<PlanFragmentId, FragmentInstance> instanceMap;
   // Record which PlanFragment the PlanNode belongs
-  Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
-  List<FragmentInstance> fragmentInstanceList;
+  private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
+  private final List<FragmentInstance> fragmentInstanceList;
+  // Record which dataNode each instance allocated to
+  private final Map<TDataNodeLocation, List<FragmentInstance>> locationToInstanceMap;
 
   public SimpleFragmentParallelPlanner(
       SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
@@ -75,6 +77,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     this.instanceMap = new HashMap<>();
     this.planNodeMap = new HashMap<>();
     this.fragmentInstanceList = new ArrayList<>();
+    this.locationToInstanceMap = new HashMap<>();
   }
 
   @Override
@@ -90,6 +93,13 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
       recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
       produceFragmentInstance(fragment);
     }
+    for (Map.Entry<TDataNodeLocation, List<FragmentInstance>> entry :
+        locationToInstanceMap.entrySet()) {
+      List<FragmentInstance> fragmentInstances = entry.getValue();
+      int instanceNumInDataNode = fragmentInstances.size();
+      fragmentInstances.forEach(
+          instance -> instance.setInstanceNumInDataNode(instanceNumInDataNode));
+    }
   }
 
   private void produceFragmentInstance(PlanFragment fragment) {
@@ -137,6 +147,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
     fragmentInstanceList.add(fragmentInstance);
+    locationToInstanceMap
+        .computeIfAbsent(fragmentInstance.getHostDataNode(), x -> new ArrayList<>())
+        .add(fragmentInstance);
   }
 
   private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 97bd7e1748..310b3f7c23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -68,6 +68,8 @@ public class FragmentInstance implements IConsensusRequest {
 
   private final SessionInfo sessionInfo;
 
+  private int instanceNumInDataNode;
+
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
@@ -126,6 +128,14 @@ public class FragmentInstance implements IConsensusRequest {
     return executorType.getRegionReplicaSet();
   }
 
+  public void setInstanceNumInDataNode(int instanceNumInDataNode) {
+    this.instanceNumInDataNode = instanceNumInDataNode;
+  }
+
+  public int getInstanceNumInDataNode() {
+    return instanceNumInDataNode;
+  }
+
   public PlanFragment getFragment() {
     return fragment;
   }
@@ -183,6 +193,7 @@ public class FragmentInstance implements IConsensusRequest {
     boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
     fragmentInstance.hostDataNode =
         hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
+    fragmentInstance.setInstanceNumInDataNode(ReadWriteIOUtils.readInt(buffer));
     return fragmentInstance;
   }
 
@@ -205,6 +216,7 @@ public class FragmentInstance implements IConsensusRequest {
       if (hostDataNode != null) {
         ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream);
       }
+      ReadWriteIOUtils.write(instanceNumInDataNode, outputStream);
       return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
     } catch (IOException e) {
       logger.error("Unexpected error occurs when serializing this FragmentInstance.", e);