You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:01 UTC

[iotdb] 04/08: save trial

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b5f5b028b5a0987032a306b2a3c4ab0f57bc5337
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 15:32:02 2022 +0800

    save trial
---
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 73 +++++++++++++++++++++-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |  9 +--
 2 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 9b0d3d7c81..75fd5623ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
@@ -31,15 +34,22 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 class ClusterSchemaFetchExecutor {
@@ -47,14 +57,75 @@ class ClusterSchemaFetchExecutor {
   private final Coordinator coordinator;
   private final Supplier<Long> queryIdProvider;
   private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+  private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
+
+  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>();
+
+  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>();
+
+  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>();
 
   ClusterSchemaFetchExecutor(
       Coordinator coordinator,
       Supplier<Long> queryIdProvider,
-      BiFunction<Long, Statement, ExecutionResult> statementExecutor) {
+      BiFunction<Long, Statement, ExecutionResult> statementExecutor,
+      Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider) {
     this.coordinator = coordinator;
     this.queryIdProvider = queryIdProvider;
     this.statementExecutor = statementExecutor;
+    this.templateSetInfoProvider = templateSetInfoProvider;
+  }
+
+  ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){
+
+    final AtomicBoolean shouldWait = new AtomicBoolean(true);
+    executingTaskMap.compute(devicePath, (key, value) -> {
+      if (value == null){
+        return null;
+      }
+      if (value.right.size() < measurements.size()){
+        shouldWait.set(false);
+        return value;
+      }
+      for (String measurement: measurements){
+        if (!value.right.contains(measurement)){
+          shouldWait.set(false);
+          return value;
+        }
+      }
+      value.left.getAndIncrement();
+      return value;
+    });
+    if (shouldWait.get()){
+      while (!fetchedResultMap.containsKey(devicePath));
+      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+      Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
+      if (pair.left.decrementAndGet() == 0){
+        fetchedResultMap.remove(devicePath);
+      }
+    }else {
+      Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> {
+        if (value == null){
+          value = new Pair<>(new AtomicInteger(0), new HashSet<>());
+        }
+        value.left.getAndIncrement();
+        value.right.addAll(measurements);
+        return value;
+      });
+      synchronized (task){
+        while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task);
+        PathPatternTree patternTree = new PathPatternTree();
+        for (String measurement: task.right){
+          patternTree.appendFullPath(devicePath, measurement);
+        }
+        ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false));
+        Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree);
+        while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult);
+
+      }
+    }
+
+    return null;
   }
 
   ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 4d5a7e498d..564b97bfba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -83,7 +83,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
                   "",
                   ClusterPartitionFetcher.getInstance(),
                   this,
-                  config.getQueryTimeoutThreshold()));
+                  config.getQueryTimeoutThreshold()),
+              templateManager::checkAllRelatedTemplate);
 
   private static final class ClusterSchemaFetcherHolder {
     private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -119,7 +120,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     if (withTags) {
       return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-          new SchemaFetchStatement(patternTree, templateMap, withTags));
+          new SchemaFetchStatement(patternTree, templateMap, true));
     }
 
     List<PartialPath> fullPathList = new ArrayList<>();
@@ -131,7 +132,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     if (fullPathList.isEmpty()) {
       return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-          new SchemaFetchStatement(patternTree, templateMap, withTags));
+          new SchemaFetchStatement(patternTree, templateMap, false));
     }
 
     // The schema cache R/W and fetch operation must be locked together thus the cache clean
@@ -165,7 +166,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
       schemaTree =
           clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-              new SchemaFetchStatement(patternTree, templateMap, withTags));
+              new SchemaFetchStatement(patternTree, templateMap, false));
 
       // only cache the schema fetched by full path
       List<MeasurementPath> measurementPathList;