You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/09 08:07:20 UTC
[iotdb] 01/01: Implement all fragment instance sharing dop in one dataNode
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch FIDop
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3023b793babd089c5c9173ca65d20bd197eb5c3b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Mar 9 16:06:57 2023 +0800
Implement all fragment instance sharing dop in one dataNode
---
.../fragment/FragmentInstanceContext.java | 42 +++++++++++++++++++---
.../fragment/FragmentInstanceManager.java | 8 ++---
.../plan/planner/LocalExecutionPlanContext.java | 2 ++
.../SimpleFragmentParallelPlanner.java | 25 +++++++++----
.../db/mpp/plan/planner/plan/FragmentInstance.java | 12 +++++++
5 files changed, 72 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index f810bc1ddb..b299dd92ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.IDTable;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -75,6 +77,8 @@ public class FragmentInstanceContext extends QueryContext {
// session info
private SessionInfo sessionInfo;
+ private int degreeOfParallelism = 1;
+
// private final GcMonitor gcMonitor;
// private final AtomicLong startNanos = new AtomicLong();
// private final AtomicLong startFullGcCount = new AtomicLong(-1);
@@ -84,9 +88,12 @@ public class FragmentInstanceContext extends QueryContext {
// private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
public static FragmentInstanceContext createFragmentInstanceContext(
- FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
+ FragmentInstanceId id,
+ FragmentInstanceStateMachine stateMachine,
+ FragmentInstance fragmentInstance) {
FragmentInstanceContext instanceContext =
- new FragmentInstanceContext(id, stateMachine, sessionInfo);
+ new FragmentInstanceContext(id, stateMachine, fragmentInstance.getSessionInfo());
+ instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -95,11 +102,16 @@ public class FragmentInstanceContext extends QueryContext {
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id,
FragmentInstanceStateMachine stateMachine,
- SessionInfo sessionInfo,
IDataRegionForQuery dataRegion,
- Filter timeFilter) {
+ FragmentInstance fragmentInstance) {
FragmentInstanceContext instanceContext =
- new FragmentInstanceContext(id, stateMachine, sessionInfo, dataRegion, timeFilter);
+ new FragmentInstanceContext(
+ id,
+ stateMachine,
+ fragmentInstance.getSessionInfo(),
+ dataRegion,
+ fragmentInstance.getTimeFilter());
+ instanceContext.setDegreeOfParallelism(calculateDegreeOfParallelism(fragmentInstance));
instanceContext.initialize();
instanceContext.start();
return instanceContext;
@@ -368,4 +380,24 @@ public class FragmentInstanceContext extends QueryContext {
sourcePaths = null;
sharedQueryDataSource = null;
}
+
+ private static int calculateDegreeOfParallelism(FragmentInstance fragmentInstance) {
+ int systemDop = IoTDBDescriptor.getInstance().getConfig().getDegreeOfParallelism();
+ int instanceNumInDataNode = fragmentInstance.getInstanceNumInDataNode();
+ if (instanceNumInDataNode >= systemDop) {
+ return 1;
+ } else if (fragmentInstance.isRoot()) {
+ return systemDop / instanceNumInDataNode + systemDop % instanceNumInDataNode;
+ } else {
+ return systemDop / instanceNumInDataNode;
+ }
+ }
+
+ public void setDegreeOfParallelism(int degreeOfParallelism) {
+ this.degreeOfParallelism = degreeOfParallelism;
+ }
+
+ public int getDegreeOfParallelism() {
+ return degreeOfParallelism;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..47fbd43273 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -123,11 +123,7 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId,
- stateMachine,
- instance.getSessionInfo(),
- dataRegion,
- instance.getTimeFilter()));
+ fragmentInstanceId, stateMachine, dataRegion, instance));
try {
List<PipelineDriverFactory> driverFactories =
@@ -190,7 +186,7 @@ public class FragmentInstanceManager {
instanceId,
fragmentInstanceId ->
createFragmentInstanceContext(
- fragmentInstanceId, stateMachine, instance.getSessionInfo()));
+ fragmentInstanceId, stateMachine, instance));
try {
List<PipelineDriverFactory> driverFactories =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index 3dfde552ca..f6e4eb4d47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -89,6 +89,7 @@ public class LocalExecutionPlanContext {
this.nextPipelineId = new AtomicInteger(0);
this.driverContext = new DataDriverContext(instanceContext, getNextPipelineId());
this.pipelineDriverFactories = new ArrayList<>();
+ this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
}
// For creating subContext, differ from parent context mainly in driver context
@@ -119,6 +120,7 @@ public class LocalExecutionPlanContext {
this.dataRegionTTL = Long.MAX_VALUE;
this.driverContext =
new SchemaDriverContext(instanceContext, schemaRegion, getNextPipelineId());
+ this.degreeOfParallelism = instanceContext.getDegreeOfParallelism();
this.pipelineDriverFactories = new ArrayList<>();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 6361a91f98..9ccf34bece 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -57,15 +57,17 @@ import java.util.Map;
public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
private static final Logger logger = LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
- private SubPlan subPlan;
- private Analysis analysis;
- private MPPQueryContext queryContext;
+ private final SubPlan subPlan;
+ private final Analysis analysis;
+ private final MPPQueryContext queryContext;
// Record all the FragmentInstances belonged to same PlanFragment
- Map<PlanFragmentId, FragmentInstance> instanceMap;
+ private final Map<PlanFragmentId, FragmentInstance> instanceMap;
// Record which PlanFragment the PlanNode belongs
- Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
- List<FragmentInstance> fragmentInstanceList;
+ private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
+ private final List<FragmentInstance> fragmentInstanceList;
+ // Record which dataNode each instance allocated to
+ private final Map<TDataNodeLocation, List<FragmentInstance>> locationToInstanceMap;
public SimpleFragmentParallelPlanner(
SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
@@ -75,6 +77,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
this.instanceMap = new HashMap<>();
this.planNodeMap = new HashMap<>();
this.fragmentInstanceList = new ArrayList<>();
+ this.locationToInstanceMap = new HashMap<>();
}
@Override
@@ -90,6 +93,13 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
produceFragmentInstance(fragment);
}
+ for (Map.Entry<TDataNodeLocation, List<FragmentInstance>> entry :
+ locationToInstanceMap.entrySet()) {
+ List<FragmentInstance> fragmentInstances = entry.getValue();
+ int instanceNumInDataNode = fragmentInstances.size();
+ fragmentInstances.forEach(
+ instance -> instance.setInstanceNumInDataNode(instanceNumInDataNode));
+ }
}
private void produceFragmentInstance(PlanFragment fragment) {
@@ -137,6 +147,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
+ locationToInstanceMap
+ .computeIfAbsent(fragmentInstance.getHostDataNode(), x -> new ArrayList<>())
+ .add(fragmentInstance);
}
private TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplicaSet) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index 97bd7e1748..310b3f7c23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -68,6 +68,8 @@ public class FragmentInstance implements IConsensusRequest {
private final SessionInfo sessionInfo;
+ private int instanceNumInDataNode;
+
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
@@ -126,6 +128,14 @@ public class FragmentInstance implements IConsensusRequest {
return executorType.getRegionReplicaSet();
}
+ public void setInstanceNumInDataNode(int instanceNumInDataNode) {
+ this.instanceNumInDataNode = instanceNumInDataNode;
+ }
+
+ public int getInstanceNumInDataNode() {
+ return instanceNumInDataNode;
+ }
+
public PlanFragment getFragment() {
return fragment;
}
@@ -183,6 +193,7 @@ public class FragmentInstance implements IConsensusRequest {
boolean hasHostDataNode = ReadWriteIOUtils.readBool(buffer);
fragmentInstance.hostDataNode =
hasHostDataNode ? ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer) : null;
+ fragmentInstance.setInstanceNumInDataNode(ReadWriteIOUtils.readInt(buffer));
return fragmentInstance;
}
@@ -205,6 +216,7 @@ public class FragmentInstance implements IConsensusRequest {
if (hostDataNode != null) {
ThriftCommonsSerDeUtils.serializeTDataNodeLocation(hostDataNode, outputStream);
}
+ ReadWriteIOUtils.write(instanceNumInDataNode, outputStream);
return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
} catch (IOException e) {
logger.error("Unexpected error occurs when serializing this FragmentInstance.", e);