You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:01 UTC
[iotdb] 04/08: save trial
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b5f5b028b5a0987032a306b2a3c4ab0f57bc5337
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 15:32:02 2022 +0800
save trial
---
.../analyze/schema/ClusterSchemaFetchExecutor.java | 73 +++++++++++++++++++++-
.../plan/analyze/schema/ClusterSchemaFetcher.java | 9 +--
2 files changed, 77 insertions(+), 5 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 9b0d3d7c81..75fd5623ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.plan.analyze.schema;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
@@ -31,15 +34,22 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Supplier;
class ClusterSchemaFetchExecutor {
@@ -47,14 +57,75 @@ class ClusterSchemaFetchExecutor {
private final Coordinator coordinator;
private final Supplier<Long> queryIdProvider;
private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+ private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
+
+ private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>();
+
+ private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>();
+
+ private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>();
ClusterSchemaFetchExecutor(
Coordinator coordinator,
Supplier<Long> queryIdProvider,
- BiFunction<Long, Statement, ExecutionResult> statementExecutor) {
+ BiFunction<Long, Statement, ExecutionResult> statementExecutor,
+ Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider) {
this.coordinator = coordinator;
this.queryIdProvider = queryIdProvider;
this.statementExecutor = statementExecutor;
+ this.templateSetInfoProvider = templateSetInfoProvider;
+ }
+
+ ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){
+
+ final AtomicBoolean shouldWait = new AtomicBoolean(true);
+ executingTaskMap.compute(devicePath, (key, value) -> {
+ if (value == null){
+ return null;
+ }
+ if (value.right.size() < measurements.size()){
+ shouldWait.set(false);
+ return value;
+ }
+ for (String measurement: measurements){
+ if (!value.right.contains(measurement)){
+ shouldWait.set(false);
+ return value;
+ }
+ }
+ value.left.getAndIncrement();
+ return value;
+ });
+ if (shouldWait.get()){
+ while (!fetchedResultMap.containsKey(devicePath));
+ ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+ Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
+ if (pair.left.decrementAndGet() == 0){
+ fetchedResultMap.remove(devicePath);
+ }
+ }else {
+ Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> {
+ if (value == null){
+ value = new Pair<>(new AtomicInteger(0), new HashSet<>());
+ }
+ value.left.getAndIncrement();
+ value.right.addAll(measurements);
+ return value;
+ });
+ synchronized (task){
+ while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task);
+ PathPatternTree patternTree = new PathPatternTree();
+ for (String measurement: task.right){
+ patternTree.appendFullPath(devicePath, measurement);
+ }
+ ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false));
+ Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree);
+ while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult);
+
+ }
+ }
+
+ return null;
}
ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 4d5a7e498d..564b97bfba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -83,7 +83,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
"",
ClusterPartitionFetcher.getInstance(),
this,
- config.getQueryTimeoutThreshold()));
+ config.getQueryTimeoutThreshold()),
+ templateManager::checkAllRelatedTemplate);
private static final class ClusterSchemaFetcherHolder {
private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -119,7 +120,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
if (withTags) {
return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
- new SchemaFetchStatement(patternTree, templateMap, withTags));
+ new SchemaFetchStatement(patternTree, templateMap, true));
}
List<PartialPath> fullPathList = new ArrayList<>();
@@ -131,7 +132,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
if (fullPathList.isEmpty()) {
return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
- new SchemaFetchStatement(patternTree, templateMap, withTags));
+ new SchemaFetchStatement(patternTree, templateMap, false));
}
// The schema cache R/W and fetch operation must be locked together thus the cache clean
@@ -165,7 +166,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
schemaTree =
clusterSchemaFetchExecutor.executeSchemaFetchQuery(
- new SchemaFetchStatement(patternTree, templateMap, withTags));
+ new SchemaFetchStatement(patternTree, templateMap, false));
// only cache the schema fetched by full path
List<MeasurementPath> measurementPathList;