You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:02 UTC
[iotdb] 05/08: save trial
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2b069950bc07c51c50952bb2b63d610505cac49a
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 16:09:43 2022 +0800
save trial
---
.../analyze/schema/ClusterSchemaFetchExecutor.java | 128 ++++++++++++++-------
.../plan/analyze/schema/ClusterSchemaFetcher.java | 2 +-
2 files changed, 86 insertions(+), 44 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 75fd5623ac..2008dadaeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -59,11 +59,13 @@ class ClusterSchemaFetchExecutor {
private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
- private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>();
+ private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap =
+ new ConcurrentHashMap<>();
- private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>();
+ private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap =
+ new ConcurrentHashMap<>();
- private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>();
+ private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>();
ClusterSchemaFetchExecutor(
Coordinator coordinator,
@@ -76,54 +78,69 @@ class ClusterSchemaFetchExecutor {
this.templateSetInfoProvider = templateSetInfoProvider;
}
- ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){
+ ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
final AtomicBoolean shouldWait = new AtomicBoolean(true);
- executingTaskMap.compute(devicePath, (key, value) -> {
- if (value == null){
- return null;
- }
- if (value.right.size() < measurements.size()){
- shouldWait.set(false);
- return value;
- }
- for (String measurement: measurements){
- if (!value.right.contains(measurement)){
- shouldWait.set(false);
+ executingTaskMap.compute(
+ devicePath,
+ (key, value) -> {
+ if (value == null) {
+ return null;
+ }
+ shouldWait.set(value.checkAndAddWaitingThread(measurements));
return value;
+ });
+
+ if (!shouldWait.get()) {
+ DeviceSchemaFetchTask task =
+ waitingTaskMap.compute(
+ devicePath,
+ (key, value) -> {
+ if (value == null) {
+ value = new DeviceSchemaFetchTask();
+ }
+ value.addWaitingThread(measurements);
+ return value;
+ });
+ if (executingTaskMap.get(devicePath) != task) {
+ synchronized (task) {
+ if (executingTaskMap.get(devicePath) != task) {
+ while (true) {
+ if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) {
+ break;
+ }
+ }
+ PathPatternTree patternTree = new PathPatternTree();
+ for (String measurement : task.measurementSet) {
+ patternTree.appendFullPath(devicePath, measurement);
+ }
+ ClusterSchemaTree fetchedSchemaTree =
+ executeSchemaFetchQuery(
+ new SchemaFetchStatement(
+ patternTree, templateSetInfoProvider.apply(devicePath), false));
+ Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult =
+ new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree);
+ while (true) {
+ if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult)
+ != fetchSchemaResult) {
+ break;
+ }
+ }
+ }
}
}
- value.left.getAndIncrement();
- return value;
- });
- if (shouldWait.get()){
- while (!fetchedResultMap.containsKey(devicePath));
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
- if (pair.left.decrementAndGet() == 0){
- fetchedResultMap.remove(devicePath);
- }
- }else {
- Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> {
- if (value == null){
- value = new Pair<>(new AtomicInteger(0), new HashSet<>());
- }
- value.left.getAndIncrement();
- value.right.addAll(measurements);
- return value;
- });
- synchronized (task){
- while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task);
- PathPatternTree patternTree = new PathPatternTree();
- for (String measurement: task.right){
- patternTree.appendFullPath(devicePath, measurement);
- }
- ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false));
- Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree);
- while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult);
+ }
+ while (true) {
+ if (!fetchedResultMap.containsKey(devicePath)) {
+ break;
}
}
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
+ if (pair.left.decrementAndGet() == 0) {
+ fetchedResultMap.remove(devicePath);
+ }
return null;
}
@@ -186,4 +203,29 @@ class ClusterSchemaFetchExecutor {
// Totally memory operation. This case won't happen.
}
}
+
+ private static class DeviceSchemaFetchTask {
+
+ private int waitingThreadNum = 0;
+
+ private final Set<String> measurementSet = new HashSet<>();
+
+ private boolean checkAndAddWaitingThread(List<String> measurements) {
+ if (measurementSet.size() < measurements.size()) {
+ return false;
+ }
+ for (String measurement : measurements) {
+ if (!measurementSet.contains(measurement)) {
+ return false;
+ }
+ }
+ waitingThreadNum++;
+ return true;
+ }
+
+ private void addWaitingThread(List<String> measurements) {
+ waitingThreadNum++;
+ measurementSet.addAll(measurements);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 564b97bfba..52eabcdcc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -84,7 +84,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
ClusterPartitionFetcher.getInstance(),
this,
config.getQueryTimeoutThreshold()),
- templateManager::checkAllRelatedTemplate);
+ templateManager::checkAllRelatedTemplate);
private static final class ClusterSchemaFetcherHolder {
private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();