You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:43:57 UTC

[iotdb] branch concurrent_schema_fetch created (now 0c5416ecfa)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a change to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 0c5416ecfa fix bug

This branch includes the following new commits:

     new f4a8189aec add analyze.schema package
     new aa1f667ac1 extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor
     new a37500f2f9 improve code structure of fetch schema for data inert and auto create schema
     new b5f5b028b5 save trial
     new 2b069950bc save trial
     new 0ebd32b922 save trial
     new ac49e5c19c basically implement
     new 0c5416ecfa fix bug

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 04/08: save trial

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b5f5b028b5a0987032a306b2a3c4ab0f57bc5337
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 15:32:02 2022 +0800

    save trial
---
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 73 +++++++++++++++++++++-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |  9 +--
 2 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 9b0d3d7c81..75fd5623ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
@@ -31,15 +34,22 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 class ClusterSchemaFetchExecutor {
@@ -47,14 +57,75 @@ class ClusterSchemaFetchExecutor {
   private final Coordinator coordinator;
   private final Supplier<Long> queryIdProvider;
   private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+  private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
+
+  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>();
+
+  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>();
+
+  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>();
 
   ClusterSchemaFetchExecutor(
       Coordinator coordinator,
       Supplier<Long> queryIdProvider,
-      BiFunction<Long, Statement, ExecutionResult> statementExecutor) {
+      BiFunction<Long, Statement, ExecutionResult> statementExecutor,
+      Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider) {
     this.coordinator = coordinator;
     this.queryIdProvider = queryIdProvider;
     this.statementExecutor = statementExecutor;
+    this.templateSetInfoProvider = templateSetInfoProvider;
+  }
+
+  ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){
+
+    final AtomicBoolean shouldWait = new AtomicBoolean(true);
+    executingTaskMap.compute(devicePath, (key, value) -> {
+      if (value == null){
+        return null;
+      }
+      if (value.right.size() < measurements.size()){
+        shouldWait.set(false);
+        return value;
+      }
+      for (String measurement: measurements){
+        if (!value.right.contains(measurement)){
+          shouldWait.set(false);
+          return value;
+        }
+      }
+      value.left.getAndIncrement();
+      return value;
+    });
+    if (shouldWait.get()){
+      while (!fetchedResultMap.containsKey(devicePath));
+      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+      Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
+      if (pair.left.decrementAndGet() == 0){
+        fetchedResultMap.remove(devicePath);
+      }
+    }else {
+      Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> {
+        if (value == null){
+          value = new Pair<>(new AtomicInteger(0), new HashSet<>());
+        }
+        value.left.getAndIncrement();
+        value.right.addAll(measurements);
+        return value;
+      });
+      synchronized (task){
+        while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task);
+        PathPatternTree patternTree = new PathPatternTree();
+        for (String measurement: task.right){
+          patternTree.appendFullPath(devicePath, measurement);
+        }
+        ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false));
+        Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree);
+        while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult);
+
+      }
+    }
+
+    return null;
   }
 
   ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 4d5a7e498d..564b97bfba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -83,7 +83,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
                   "",
                   ClusterPartitionFetcher.getInstance(),
                   this,
-                  config.getQueryTimeoutThreshold()));
+                  config.getQueryTimeoutThreshold()),
+              templateManager::checkAllRelatedTemplate);
 
   private static final class ClusterSchemaFetcherHolder {
     private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
@@ -119,7 +120,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     if (withTags) {
       return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-          new SchemaFetchStatement(patternTree, templateMap, withTags));
+          new SchemaFetchStatement(patternTree, templateMap, true));
     }
 
     List<PartialPath> fullPathList = new ArrayList<>();
@@ -131,7 +132,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     if (fullPathList.isEmpty()) {
       return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-          new SchemaFetchStatement(patternTree, templateMap, withTags));
+          new SchemaFetchStatement(patternTree, templateMap, false));
     }
 
     // The schema cache R/W and fetch operation must be locked together thus the cache clean
@@ -165,7 +166,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
       schemaTree =
           clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-              new SchemaFetchStatement(patternTree, templateMap, withTags));
+              new SchemaFetchStatement(patternTree, templateMap, false));
 
       // only cache the schema fetched by full path
       List<MeasurementPath> measurementPathList;


[iotdb] 03/08: improve code structure of fetch schema for data inert and auto create schema

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a37500f2f9a624ae4b220b76dad63b8b4ecdda88
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 10:13:33 2022 +0800

    improve code structure of fetch schema for data inert and auto create schema
---
 .../analyze/schema/AutoCreateSchemaExecutor.java   |  99 +++++++++++++++-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  | 127 ++++++---------------
 2 files changed, 134 insertions(+), 92 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
index b5389698b5..b521a355e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -25,36 +25,129 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
 class AutoCreateSchemaExecutor {
 
   private final Function<Statement, ExecutionResult> statementExecutor;
+  private final Function<PartialPath, Pair<Template, PartialPath>> templateSetInfoProvider;
 
-  AutoCreateSchemaExecutor(Function<Statement, ExecutionResult> statementExecutor) {
+  AutoCreateSchemaExecutor(
+      Function<Statement, ExecutionResult> statementExecutor,
+      Function<PartialPath, Pair<Template, PartialPath>> templateSetInfoProvider) {
     this.statementExecutor = statementExecutor;
+    this.templateSetInfoProvider = templateSetInfoProvider;
+  }
+
+  void autoCreateSchema(
+      ClusterSchemaTree schemaTree,
+      PartialPath devicePath,
+      List<Integer> indexOfMissingMeasurements,
+      String[] measurements,
+      Function<Integer, TSDataType> getDataType,
+      TSEncoding[] encodings,
+      CompressionType[] compressionTypes,
+      boolean isAligned) {
+    // check whether there is template should be activated
+    Pair<Template, PartialPath> templateInfo = templateSetInfoProvider.apply(devicePath);
+    if (templateInfo != null) {
+      Template template = templateInfo.left;
+      boolean shouldActivateTemplate = false;
+      for (int index : indexOfMissingMeasurements) {
+        if (template.hasSchema(measurements[index])) {
+          shouldActivateTemplate = true;
+          break;
+        }
+      }
+
+      if (shouldActivateTemplate) {
+        internalActivateTemplate(devicePath);
+        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
+        for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
+          if (!template.hasSchema(measurements[i])) {
+            recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
+          }
+        }
+        indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
+        for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
+          schemaTree.appendSingleMeasurement(
+              devicePath.concatNode(entry.getKey()),
+              (MeasurementSchema) entry.getValue(),
+              null,
+              null,
+              template.isDirectAligned());
+        }
+
+        if (indexOfMissingMeasurements.isEmpty()) {
+          return;
+        }
+      }
+    }
+
+    // auto create the rest missing timeseries
+    List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
+    List<TSDataType> dataTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfMissingMeasurements.size());
+    List<TSEncoding> encodingsOfMissingMeasurement =
+        new ArrayList<>(indexOfMissingMeasurements.size());
+    List<CompressionType> compressionTypesOfMissingMeasurement =
+        new ArrayList<>(indexOfMissingMeasurements.size());
+    indexOfMissingMeasurements.forEach(
+        index -> {
+          TSDataType tsDataType = getDataType.apply(index);
+          // tsDataType == null means insert null value to a non-exist series
+          // should skip creating them
+          if (tsDataType != null) {
+            missingMeasurements.add(measurements[index]);
+            dataTypesOfMissingMeasurement.add(tsDataType);
+            encodingsOfMissingMeasurement.add(
+                encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]);
+            compressionTypesOfMissingMeasurement.add(
+                compressionTypes == null
+                    ? TSFileDescriptor.getInstance().getConfig().getCompressor()
+                    : compressionTypes[index]);
+          }
+        });
+
+    if (!missingMeasurements.isEmpty()) {
+      schemaTree.mergeSchemaTree(
+          internalCreateTimeseries(
+              devicePath,
+              missingMeasurements,
+              dataTypesOfMissingMeasurement,
+              encodingsOfMissingMeasurement,
+              compressionTypesOfMissingMeasurement,
+              isAligned));
+    }
   }
 
   // try to create the target timeseries and return schemaTree involving successfully created
   // timeseries and existing timeseries
-  ClusterSchemaTree internalCreateTimeseries(
+  private ClusterSchemaTree internalCreateTimeseries(
       PartialPath devicePath,
       List<String> measurements,
       List<TSDataType> tsDataTypes,
@@ -125,7 +218,7 @@ class AutoCreateSchemaExecutor {
     return alreadyExistingMeasurements;
   }
 
-  void internalActivateTemplate(PartialPath devicePath) {
+  private void internalActivateTemplate(PartialPath devicePath) {
     ExecutionResult executionResult =
         statementExecutor.apply(new ActivateTemplateStatement(devicePath));
     TSStatus status = executionResult.status;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 3f88d88825..4d5a7e498d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -34,12 +34,10 @@ import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
@@ -53,8 +51,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-
 public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -73,7 +69,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
                   "",
                   ClusterPartitionFetcher.getInstance(),
                   this,
-                  config.getQueryTimeoutThreshold()));
+                  config.getQueryTimeoutThreshold()),
+          templateManager::checkTemplateSetInfo);
   private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
       new ClusterSchemaFetchExecutor(
           coordinator,
@@ -102,15 +99,17 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public ClusterSchemaTree fetchSchema(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, false);
+    return checkPatternTreeAndFetchSchema(patternTree, false);
   }
 
   @Override
   public ClusterSchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
-    return fetchSchema(patternTree, true);
+    return checkPatternTreeAndFetchSchema(patternTree, true);
   }
 
-  private ClusterSchemaTree fetchSchema(PathPatternTree patternTree, boolean withTags) {
+  // used for patternTree that may have wildcard, mainly for data query
+  private ClusterSchemaTree checkPatternTreeAndFetchSchema(
+      PathPatternTree patternTree, boolean withTags) {
     Map<Integer, Template> templateMap = new HashMap<>();
     patternTree.constructTree();
     List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
@@ -208,7 +207,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       for (int index : indexOfMissingMeasurements) {
         patternTree.appendFullPath(devicePath, measurements[index]);
       }
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
+      ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree);
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
         schemaCache.put(remoteSchemaTree);
@@ -276,7 +275,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
 
       // try fetch the missing schema from remote and cache fetched schema
-      ClusterSchemaTree remoteSchemaTree = fetchSchema(patternTree);
+      ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree);
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
         schemaCache.put(remoteSchemaTree);
@@ -320,6 +319,18 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     return templateManager.getAllPathsSetTemplate(templateName);
   }
 
+  // used for patternTree without wildcard, mainly for data insert and load tsFile
+  private ClusterSchemaTree fetchSchemaFromRemote(PathPatternTree patternTree) {
+    Map<Integer, Template> templateMap = new HashMap<>();
+    patternTree.constructTree();
+    List<PartialPath> pathPatternList = patternTree.getAllPathPatterns();
+    for (PartialPath pattern : pathPatternList) {
+      templateMap.putAll(templateManager.checkAllRelatedTemplate(pattern));
+    }
+    return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+        new SchemaFetchStatement(patternTree, templateMap, false));
+  }
+
   // check which measurements are missing and auto create the missing measurements and merge them
   // into given schemaTree
   private void checkAndAutoCreateMissingMeasurements(
@@ -338,90 +349,28 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
             indexOfMissingMeasurements.stream()
                 .map(index -> measurements[index])
                 .collect(Collectors.toList()));
-    if (deviceSchemaInfo != null) {
+    List<Integer> recheckedIndexOfMissingMeasurements;
+    if (deviceSchemaInfo == null) {
+      recheckedIndexOfMissingMeasurements = indexOfMissingMeasurements;
+    } else {
+      recheckedIndexOfMissingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
       List<MeasurementSchema> schemaList = deviceSchemaInfo.getMeasurementSchemaList();
-      int removedCount = 0;
       for (int i = 0, size = schemaList.size(); i < size; i++) {
-        if (schemaList.get(i) != null) {
-          indexOfMissingMeasurements.remove(i - removedCount);
-          removedCount++;
+        if (schemaList.get(i) == null) {
+          recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
         }
       }
     }
-    if (indexOfMissingMeasurements.isEmpty()) {
-      return;
-    }
-
-    // check whether there is template should be activated
-    Pair<Template, PartialPath> templateInfo = templateManager.checkTemplateSetInfo(devicePath);
-    if (templateInfo != null) {
-      Template template = templateInfo.left;
-      boolean shouldActivateTemplate = false;
-      for (int index : indexOfMissingMeasurements) {
-        if (template.hasSchema(measurements[index])) {
-          shouldActivateTemplate = true;
-          break;
-        }
-      }
-
-      if (shouldActivateTemplate) {
-        autoCreateSchemaExecutor.internalActivateTemplate(devicePath);
-        List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
-        for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
-          if (!template.hasSchema(measurements[i])) {
-            recheckedIndexOfMissingMeasurements.add(indexOfMissingMeasurements.get(i));
-          }
-        }
-        indexOfMissingMeasurements = recheckedIndexOfMissingMeasurements;
-        for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) {
-          schemaTree.appendSingleMeasurement(
-              devicePath.concatNode(entry.getKey()),
-              (MeasurementSchema) entry.getValue(),
-              null,
-              null,
-              template.isDirectAligned());
-        }
-
-        if (indexOfMissingMeasurements.isEmpty()) {
-          return;
-        }
-      }
-    }
-
-    // auto create the rest missing timeseries
-    List<String> missingMeasurements = new ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSDataType> dataTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<TSEncoding> encodingsOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    List<CompressionType> compressionTypesOfMissingMeasurement =
-        new ArrayList<>(indexOfMissingMeasurements.size());
-    indexOfMissingMeasurements.forEach(
-        index -> {
-          TSDataType tsDataType = getDataType.apply(index);
-          // tsDataType == null means insert null value to a non-exist series
-          // should skip creating them
-          if (tsDataType != null) {
-            missingMeasurements.add(measurements[index]);
-            dataTypesOfMissingMeasurement.add(tsDataType);
-            encodingsOfMissingMeasurement.add(
-                encodings == null ? getDefaultEncoding(tsDataType) : encodings[index]);
-            compressionTypesOfMissingMeasurement.add(
-                compressionTypes == null
-                    ? TSFileDescriptor.getInstance().getConfig().getCompressor()
-                    : compressionTypes[index]);
-          }
-        });
-
-    if (!missingMeasurements.isEmpty()) {
-      schemaTree.mergeSchemaTree(
-          autoCreateSchemaExecutor.internalCreateTimeseries(
-              devicePath,
-              missingMeasurements,
-              dataTypesOfMissingMeasurement,
-              encodingsOfMissingMeasurement,
-              compressionTypesOfMissingMeasurement,
-              isAligned));
+    if (!recheckedIndexOfMissingMeasurements.isEmpty()) {
+      autoCreateSchemaExecutor.autoCreateSchema(
+          schemaTree,
+          devicePath,
+          recheckedIndexOfMissingMeasurements,
+          measurements,
+          getDataType,
+          encodings,
+          compressionTypes,
+          isAligned);
     }
   }
 


[iotdb] 06/08: save trial

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0ebd32b922e10130f62b14c1d4bd41aba0a9fce7
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 20:19:48 2022 +0800

    save trial
---
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 50 +++++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 2008dadaeb..7f25b2f640 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -48,6 +48,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -79,7 +81,6 @@ class ClusterSchemaFetchExecutor {
   }
 
   ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
-
     final AtomicBoolean shouldWait = new AtomicBoolean(true);
     executingTaskMap.compute(
         devicePath,
@@ -204,6 +205,53 @@ class ClusterSchemaFetchExecutor {
     }
   }
 
+  private class DeviceSchemaFetchTaskExecutor {
+
+    private volatile DeviceSchemaFetchTask waitingTask;
+
+    private volatile DeviceSchemaFetchTask executingTask;
+
+    private volatile int restThreadNum;
+
+    private volatile ClusterSchemaTree fetchedResult;
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    private ClusterSchemaTree execute(List<String> measurements) {
+      boolean needNewTask = false;
+      readWriteLock.readLock().lock();
+      try {
+        if (executingTask != null) {
+          needNewTask = !executingTask.checkAndAddWaitingThread(measurements);
+        }
+        if (needNewTask) {
+          DeviceSchemaFetchTask task = waitingTask;
+          synchronized (this) {
+            readWriteLock.readLock().unlock();
+            readWriteLock.writeLock().lock();
+            try {
+              if (waitingTask == null) {
+                waitingTask = new DeviceSchemaFetchTask();
+                task = waitingTask;
+              }
+              waitingTask.addWaitingThread(measurements);
+            } finally {
+              readWriteLock.writeLock().unlock();
+            }
+          }
+          synchronized (task) {
+          }
+        }
+      } finally {
+        readWriteLock.readLock().unlock();
+      }
+
+      if (!executingTask.checkAndAddWaitingThread(measurements)) {}
+
+      return null;
+    }
+  }
+
   private static class DeviceSchemaFetchTask {
 
     private int waitingThreadNum = 0;


[iotdb] 02/08: extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aa1f667ac1745c216bbef22fbf9954799ea5b1f3
Author: Marccos <15...@qq.com>
AuthorDate: Mon Dec 12 11:33:19 2022 +0800

    extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor
---
 .../analyze/schema/AutoCreateSchemaExecutor.java   | 137 +++++++++++++
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 118 +++++++++++
 .../plan/analyze/schema/ClusterSchemaFetcher.java  | 223 ++++-----------------
 3 files changed, 292 insertions(+), 186 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
new file mode 100644
index 0000000000..b5389698b5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+class AutoCreateSchemaExecutor {
+
+  private final Function<Statement, ExecutionResult> statementExecutor;
+
+  AutoCreateSchemaExecutor(Function<Statement, ExecutionResult> statementExecutor) {
+    this.statementExecutor = statementExecutor;
+  }
+
+  // try to create the target timeseries and return schemaTree involving successfully created
+  // timeseries and existing timeseries
+  ClusterSchemaTree internalCreateTimeseries(
+      PartialPath devicePath,
+      List<String> measurements,
+      List<TSDataType> tsDataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      boolean isAligned) {
+    List<MeasurementPath> measurementPathList =
+        executeInternalCreateTimeseriesStatement(
+            new InternalCreateTimeSeriesStatement(
+                devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
+
+    Set<Integer> alreadyExistingMeasurementIndexSet =
+        measurementPathList.stream()
+            .map(o -> measurements.indexOf(o.getMeasurement()))
+            .collect(Collectors.toSet());
+
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+    schemaTree.appendMeasurementPaths(measurementPathList);
+
+    for (int i = 0, size = measurements.size(); i < size; i++) {
+      if (alreadyExistingMeasurementIndexSet.contains(i)) {
+        continue;
+      }
+
+      schemaTree.appendSingleMeasurement(
+          devicePath.concatNode(measurements.get(i)),
+          new MeasurementSchema(
+              measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
+          null,
+          null,
+          isAligned);
+    }
+
+    return schemaTree;
+  }
+
+  // auto create timeseries and return the existing timeseries info
+  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
+      InternalCreateTimeSeriesStatement statement) {
+
+    ExecutionResult executionResult = statementExecutor.apply(statement);
+
+    int statusCode = executionResult.status.getCode();
+    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return Collections.emptyList();
+    }
+
+    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      throw new RuntimeException(
+          new IoTDBException(executionResult.status.getMessage(), statusCode));
+    }
+
+    Set<String> failedCreationSet = new HashSet<>();
+    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
+    for (TSStatus subStatus : executionResult.status.subStatus) {
+      if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+        alreadyExistingMeasurements.add(
+            MeasurementPath.parseDataFromString(subStatus.getMessage()));
+      } else {
+        failedCreationSet.add(subStatus.message);
+      }
+    }
+
+    if (!failedCreationSet.isEmpty()) {
+      throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
+    }
+
+    return alreadyExistingMeasurements;
+  }
+
+  void internalActivateTemplate(PartialPath devicePath) {
+    ExecutionResult executionResult =
+        statementExecutor.apply(new ActivateTemplateStatement(devicePath));
+    TSStatus status = executionResult.status;
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
new file mode 100644
index 0000000000..9b0d3d7c81
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+class ClusterSchemaFetchExecutor {
+
+  private final Coordinator coordinator;
+  private final Supplier<Long> queryIdProvider;
+  private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
+
+  ClusterSchemaFetchExecutor(
+      Coordinator coordinator,
+      Supplier<Long> queryIdProvider,
+      BiFunction<Long, Statement, ExecutionResult> statementExecutor) {
+    this.coordinator = coordinator;
+    this.queryIdProvider = queryIdProvider;
+    this.statementExecutor = statementExecutor;
+  }
+
+  ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
+    long queryId = queryIdProvider.get();
+    try {
+      ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement);
+      if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException(
+            String.format(
+                "cannot fetch schema, status is: %s, msg is: %s",
+                executionResult.status.getCode(), executionResult.status.getMessage()));
+      }
+      try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
+        ClusterSchemaTree result = new ClusterSchemaTree();
+        Set<String> databaseSet = new HashSet<>();
+        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+          // The query will be transited to FINISHED when invoking getBatchResult() at the last time
+          // So we don't need to clean up it manually
+          Optional<TsBlock> tsBlock;
+          try {
+            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+          } catch (IoTDBException e) {
+            throw new RuntimeException("Fetch Schema failed. ", e);
+          }
+          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
+            break;
+          }
+          Column column = tsBlock.get().getColumn(0);
+          for (int i = 0; i < column.getPositionCount(); i++) {
+            parseFetchedData(column.getBinary(i), result, databaseSet);
+          }
+        }
+        result.setDatabases(databaseSet);
+        return result;
+      }
+    } finally {
+      coordinator.cleanupQueryExecution(queryId);
+    }
+  }
+
+  private void parseFetchedData(
+      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
+    InputStream inputStream = new ByteArrayInputStream(data.getValues());
+    try {
+      byte type = ReadWriteIOUtils.readByte(inputStream);
+      if (type == 0) {
+        int size = ReadWriteIOUtils.readInt(inputStream);
+        for (int i = 0; i < size; i++) {
+          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
+        }
+      } else if (type == 1) {
+        resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
+      } else {
+        throw new RuntimeException(
+            new MetadataException("Failed to fetch schema because of unrecognized data"));
+      }
+    } catch (IOException e) {
+      // Totally memory operation. This case won't happen.
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 67fc47d941..3f88d88825 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -18,15 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -36,37 +32,22 @@ import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
-import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
-import org.apache.iotdb.db.utils.SetThreadName;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -77,11 +58,36 @@ import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncodin
 public class ClusterSchemaFetcher implements ISchemaFetcher {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
   private final Coordinator coordinator = Coordinator.getInstance();
+
   private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance();
   private final ITemplateManager templateManager = ClusterTemplateManager.getInstance();
 
+  private final AutoCreateSchemaExecutor autoCreateSchemaExecutor =
+      new AutoCreateSchemaExecutor(
+          statement ->
+              coordinator.execute(
+                  statement,
+                  SessionManager.getInstance().requestQueryId(),
+                  null,
+                  "",
+                  ClusterPartitionFetcher.getInstance(),
+                  this,
+                  config.getQueryTimeoutThreshold()));
+  private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor =
+      new ClusterSchemaFetchExecutor(
+          coordinator,
+          () -> SessionManager.getInstance().requestQueryId(),
+          (queryId, statement) ->
+              coordinator.execute(
+                  statement,
+                  queryId,
+                  null,
+                  "",
+                  ClusterPartitionFetcher.getInstance(),
+                  this,
+                  config.getQueryTimeoutThreshold()));
+
   private static final class ClusterSchemaFetcherHolder {
     private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();
 
@@ -113,7 +119,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     if (withTags) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+      return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+          new SchemaFetchStatement(patternTree, templateMap, withTags));
     }
 
     List<PartialPath> fullPathList = new ArrayList<>();
@@ -124,7 +131,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     if (fullPathList.isEmpty()) {
-      return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+      return clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+          new SchemaFetchStatement(patternTree, templateMap, withTags));
     }
 
     // The schema cache R/W and fetch operation must be locked together thus the cache clean
@@ -148,13 +156,17 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           }
         }
         if (isAllCached) {
+          // The entry iterating order of HashMap is to some extent decided by the putting order.
+          // Therefore, we must avoid merge operation on cachedSchemaTree and fetchedSchemaTree,
+          // since the cache state varies among DataNodes.
           schemaTree.setDatabases(storageGroupSet);
           return schemaTree;
         }
       }
 
       schemaTree =
-          executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags));
+          clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+              new SchemaFetchStatement(patternTree, templateMap, withTags));
 
       // only cache the schema fetched by full path
       List<MeasurementPath> measurementPathList;
@@ -172,73 +184,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
   }
 
-  private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    try {
-      ExecutionResult executionResult =
-          coordinator.execute(
-              schemaFetchStatement,
-              queryId,
-              null,
-              "",
-              ClusterPartitionFetcher.getInstance(),
-              this,
-              config.getQueryTimeoutThreshold());
-      if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new RuntimeException(
-            String.format(
-                "cannot fetch schema, status is: %s, msg is: %s",
-                executionResult.status.getCode(), executionResult.status.getMessage()));
-      }
-      try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
-        ClusterSchemaTree result = new ClusterSchemaTree();
-        Set<String> databaseSet = new HashSet<>();
-        while (coordinator.getQueryExecution(queryId).hasNextResult()) {
-          // The query will be transited to FINISHED when invoking getBatchResult() at the last time
-          // So we don't need to clean up it manually
-          Optional<TsBlock> tsBlock;
-          try {
-            tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-          } catch (IoTDBException e) {
-            throw new RuntimeException("Fetch Schema failed. ", e);
-          }
-          if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
-            break;
-          }
-          Column column = tsBlock.get().getColumn(0);
-          for (int i = 0; i < column.getPositionCount(); i++) {
-            parseFetchedData(column.getBinary(i), result, databaseSet);
-          }
-        }
-        result.setDatabases(databaseSet);
-        return result;
-      }
-    } finally {
-      coordinator.cleanupQueryExecution(queryId);
-    }
-  }
-
-  private void parseFetchedData(
-      Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) {
-    InputStream inputStream = new ByteArrayInputStream(data.getValues());
-    try {
-      byte type = ReadWriteIOUtils.readByte(inputStream);
-      if (type == 0) {
-        int size = ReadWriteIOUtils.readInt(inputStream);
-        for (int i = 0; i < size; i++) {
-          databaseSet.add(ReadWriteIOUtils.readString(inputStream));
-        }
-      } else if (type == 1) {
-        resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream));
-      } else {
-        throw new RuntimeException(
-            new MetadataException("Failed to fetch schema because of unrecognized data"));
-      }
-    } catch (IOException e) {
-      // Totally memory operation. This case won't happen.
-    }
-  }
-
   @Override
   public ISchemaTree fetchSchemaWithAutoCreate(
       PartialPath devicePath,
@@ -420,7 +365,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
 
       if (shouldActivateTemplate) {
-        internalActivateTemplate(devicePath);
+        autoCreateSchemaExecutor.internalActivateTemplate(devicePath);
         List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>();
         for (int i = 0; i < indexOfMissingMeasurements.size(); i++) {
           if (!template.hasSchema(measurements[i])) {
@@ -470,7 +415,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
 
     if (!missingMeasurements.isEmpty()) {
       schemaTree.mergeSchemaTree(
-          internalCreateTimeseries(
+          autoCreateSchemaExecutor.internalCreateTimeseries(
               devicePath,
               missingMeasurements,
               dataTypesOfMissingMeasurement,
@@ -499,100 +444,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     return indexOfMissingMeasurements;
   }
 
-  // try to create the target timeseries and return schemaTree involving successfully created
-  // timeseries and existing timeseries
-  private ClusterSchemaTree internalCreateTimeseries(
-      PartialPath devicePath,
-      List<String> measurements,
-      List<TSDataType> tsDataTypes,
-      List<TSEncoding> encodings,
-      List<CompressionType> compressors,
-      boolean isAligned) {
-    List<MeasurementPath> measurementPathList =
-        executeInternalCreateTimeseriesStatement(
-            new InternalCreateTimeSeriesStatement(
-                devicePath, measurements, tsDataTypes, encodings, compressors, isAligned));
-
-    Set<Integer> alreadyExistingMeasurementIndexSet =
-        measurementPathList.stream()
-            .map(o -> measurements.indexOf(o.getMeasurement()))
-            .collect(Collectors.toSet());
-
-    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-    schemaTree.appendMeasurementPaths(measurementPathList);
-
-    for (int i = 0, size = measurements.size(); i < size; i++) {
-      if (alreadyExistingMeasurementIndexSet.contains(i)) {
-        continue;
-      }
-
-      schemaTree.appendSingleMeasurement(
-          devicePath.concatNode(measurements.get(i)),
-          new MeasurementSchema(
-              measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)),
-          null,
-          null,
-          isAligned);
-    }
-
-    return schemaTree;
-  }
-
-  // auto create timeseries and return the existing timeseries info
-  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
-      InternalCreateTimeSeriesStatement statement) {
-
-    ExecutionResult executionResult = executeStatement(statement);
-
-    int statusCode = executionResult.status.getCode();
-    if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return Collections.emptyList();
-    }
-
-    if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      throw new RuntimeException(
-          new IoTDBException(executionResult.status.getMessage(), statusCode));
-    }
-
-    Set<String> failedCreationSet = new HashSet<>();
-    List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
-    for (TSStatus subStatus : executionResult.status.subStatus) {
-      if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-        alreadyExistingMeasurements.add(
-            MeasurementPath.parseDataFromString(subStatus.getMessage()));
-      } else {
-        failedCreationSet.add(subStatus.message);
-      }
-    }
-
-    if (!failedCreationSet.isEmpty()) {
-      throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet)));
-    }
-
-    return alreadyExistingMeasurements;
-  }
-
-  public void internalActivateTemplate(PartialPath devicePath) {
-    ExecutionResult executionResult = executeStatement(new ActivateTemplateStatement(devicePath));
-    TSStatus status = executionResult.status;
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
-      throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode()));
-    }
-  }
-
-  private ExecutionResult executeStatement(Statement statement) {
-    long queryId = SessionManager.getInstance().requestQueryId();
-    return coordinator.execute(
-        statement,
-        queryId,
-        null,
-        "",
-        ClusterPartitionFetcher.getInstance(),
-        this,
-        config.getQueryTimeoutThreshold());
-  }
-
   @Override
   public void invalidAllCache() {
     DataNodeSchemaCache.getInstance().cleanUp();


[iotdb] 08/08: fix bug

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0c5416ecfa36a7b1c69e12cf8c23fcd2605ca590
Author: Marccos <15...@qq.com>
AuthorDate: Thu Dec 15 17:39:46 2022 +0800

    fix bug
---
 .../db/mpp/common/schematree/ClusterSchemaTree.java | 14 +++++++++-----
 .../analyze/schema/ClusterSchemaFetchExecutor.java  | 20 +++++++++-----------
 .../common/schematree/ClusterSchemaTreeTest.java    | 21 +++++++++++++++++++++
 3 files changed, 39 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 9556086fab..fc0243e607 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -39,6 +39,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
@@ -327,10 +328,10 @@ public class ClusterSchemaTree implements ISchemaTree {
   }
 
   public ClusterSchemaTree extractDeviceSubTree(PartialPath devicePath, List<String> measurements) {
-    SchemaNode root = new SchemaInternalNode(PATH_ROOT);
+    SchemaNode clonedRoot = new SchemaInternalNode(PATH_ROOT);
     String[] nodes = devicePath.getNodes();
-    SchemaNode cur = root;
-    SchemaNode clonedCur = root;
+    SchemaNode cur = this.root;
+    SchemaNode clonedCur = clonedRoot;
     SchemaNode clonedChild;
     for (int i = 1; i < nodes.length - 1; i++) {
       cur = cur.getChild(nodes[i]);
@@ -350,12 +351,13 @@ public class ClusterSchemaTree implements ISchemaTree {
 
     SchemaEntityNode deviceNode = cur.getAsEntityNode();
     SchemaEntityNode clonedDeviceNode = new SchemaEntityNode(nodes[nodes.length - 1]);
+    clonedCur.addChild(nodes[nodes.length - 1], clonedDeviceNode);
     clonedDeviceNode.setAligned(deviceNode.isAligned());
     SchemaNode child;
     SchemaMeasurementNode measurementNode;
     for (String measurement : measurements) {
       child = deviceNode.getChild(measurement);
-      if (child.isMeasurement()) {
+      if (child != null && child.isMeasurement()) {
         measurementNode = child.getAsMeasurementNode();
         clonedDeviceNode.addChild(measurementNode.getName(), measurementNode);
         if (measurementNode.getAlias() != null) {
@@ -363,6 +365,8 @@ public class ClusterSchemaTree implements ISchemaTree {
         }
       }
     }
-    return new ClusterSchemaTree(root);
+    ClusterSchemaTree result = new ClusterSchemaTree(clonedRoot);
+    result.setDatabases(Collections.singleton(getBelongedDatabase(devicePath)));
+    return result;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index a76f1a23c0..cee5775039 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -80,15 +81,18 @@ class ClusterSchemaFetchExecutor {
                 (key, value) -> {
                   if (value == null) {
                     value = new DeviceSchemaFetchTaskExecutor();
-                    value.incReferenceCount();
                   }
+                  value.incReferenceCount();
                   return value;
                 })
             .execute(devicePath, measurements);
     deviceSchemaFetchTaskExecutorMap.compute(
         devicePath,
         (key, value) -> {
-          if (value == null || value.decAndGetReferenceCount() == 0) {
+          if (value == null) {
+            throw new IllegalStateException();
+          }
+          if (value.decAndGetReferenceCount() == 0) {
             return null;
           }
           return value;
@@ -235,23 +239,17 @@ class ClusterSchemaFetchExecutor {
 
   private static class DeviceSchemaFetchTask {
 
-    private final Set<String> measurementSet = new HashSet<>();
+    private final Set<String> measurementSet = Collections.synchronizedSet(new HashSet<>());
 
     private volatile boolean hasSubmitThread = false;
 
-    private volatile ClusterSchemaTree taskResult;
+    private volatile ClusterSchemaTree taskResult = null;
 
     private boolean canCoverRequest(List<String> measurements) {
       if (measurementSet.size() < measurements.size()) {
         return false;
       }
-      for (String measurement : measurements) {
-        if (!measurementSet.contains(measurement)) {
-          return false;
-        }
-      }
-
-      return true;
+      return measurementSet.containsAll(measurements);
     }
 
     private void addRequest(List<String> measurements) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java
index 000bdc10de..0dec1a056f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTreeTest.java
@@ -37,6 +37,7 @@ import org.mockito.internal.util.collections.Sets;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -592,4 +593,24 @@ public class ClusterSchemaTreeTest {
             .searchDeviceSchemaInfo(new PartialPath("root.sg.d1"), Collections.singletonList("s1"))
             .isAligned());
   }
+
+  @Test
+  public void testExtractDeviceSubTree() throws Exception {
+    SchemaNode root = generateSchemaTree();
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree(root);
+    schemaTree.setDatabases(Collections.singleton("root.sg"));
+    ClusterSchemaTree subTree =
+        schemaTree.extractDeviceSubTree(
+            new PartialPath("root.sg.d1"), Arrays.asList("s1", "status"));
+    Assert.assertEquals(
+        2,
+        subTree
+            .searchDeviceSchemaInfo(new PartialPath("root.sg.d1"), Arrays.asList("s1", "status"))
+            .getMeasurementSchemaList()
+            .size());
+    ClusterSchemaTree emptyTree =
+        schemaTree.extractDeviceSubTree(
+            new PartialPath("root.sg.d"), Collections.singletonList("s1"));
+    Assert.assertTrue(emptyTree.isEmpty());
+  }
 }


[iotdb] 07/08: basically implement

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ac49e5c19cbfca2f753aa52ebcf9210db0c70532
Author: Marccos <15...@qq.com>
AuthorDate: Thu Dec 15 14:42:38 2022 +0800

    basically implement
---
 .../mpp/common/schematree/ClusterSchemaTree.java   |  40 ++++
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 222 +++++++++++----------
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |  27 ++-
 3 files changed, 180 insertions(+), 109 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index d1581176ca..9556086fab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -325,4 +325,44 @@ public class ClusterSchemaTree implements ISchemaTree {
   public boolean isEmpty() {
     return root.getChildren() == null || root.getChildren().size() == 0;
   }
+
+  public ClusterSchemaTree extractDeviceSubTree(PartialPath devicePath, List<String> measurements) {
+    SchemaNode root = new SchemaInternalNode(PATH_ROOT);
+    String[] nodes = devicePath.getNodes();
+    SchemaNode cur = root;
+    SchemaNode clonedCur = root;
+    SchemaNode clonedChild;
+    for (int i = 1; i < nodes.length - 1; i++) {
+      cur = cur.getChild(nodes[i]);
+      if (cur == null) {
+        return new ClusterSchemaTree();
+      }
+
+      clonedChild = new SchemaInternalNode(cur.getName());
+      clonedCur.addChild(nodes[i], clonedChild);
+      clonedCur = clonedChild;
+    }
+
+    cur = cur.getChild(nodes[nodes.length - 1]);
+    if (cur == null || !cur.isEntity()) {
+      return new ClusterSchemaTree();
+    }
+
+    SchemaEntityNode deviceNode = cur.getAsEntityNode();
+    SchemaEntityNode clonedDeviceNode = new SchemaEntityNode(nodes[nodes.length - 1]);
+    clonedDeviceNode.setAligned(deviceNode.isAligned());
+    SchemaNode child;
+    SchemaMeasurementNode measurementNode;
+    for (String measurement : measurements) {
+      child = deviceNode.getChild(measurement);
+      if (child.isMeasurement()) {
+        measurementNode = child.getAsMeasurementNode();
+        clonedDeviceNode.addChild(measurementNode.getName(), measurementNode);
+        if (measurementNode.getAlias() != null) {
+          clonedDeviceNode.addAliasChild(measurementNode.getAlias(), measurementNode);
+        }
+      }
+    }
+    return new ClusterSchemaTree(root);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 7f25b2f640..a76f1a23c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.ByteArrayInputStream;
@@ -46,8 +45,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
@@ -61,14 +58,9 @@ class ClusterSchemaFetchExecutor {
   private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
   private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
 
-  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap =
+  private final Map<PartialPath, DeviceSchemaFetchTaskExecutor> deviceSchemaFetchTaskExecutorMap =
       new ConcurrentHashMap<>();
 
-  private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap =
-      new ConcurrentHashMap<>();
-
-  private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>();
-
   ClusterSchemaFetchExecutor(
       Coordinator coordinator,
       Supplier<Long> queryIdProvider,
@@ -81,69 +73,27 @@ class ClusterSchemaFetchExecutor {
   }
 
   ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
-    final AtomicBoolean shouldWait = new AtomicBoolean(true);
-    executingTaskMap.compute(
+    ClusterSchemaTree result =
+        deviceSchemaFetchTaskExecutorMap
+            .compute(
+                devicePath,
+                (key, value) -> {
+                  if (value == null) {
+                    value = new DeviceSchemaFetchTaskExecutor();
+                    value.incReferenceCount();
+                  }
+                  return value;
+                })
+            .execute(devicePath, measurements);
+    deviceSchemaFetchTaskExecutorMap.compute(
         devicePath,
         (key, value) -> {
-          if (value == null) {
+          if (value == null || value.decAndGetReferenceCount() == 0) {
             return null;
           }
-          shouldWait.set(value.checkAndAddWaitingThread(measurements));
           return value;
         });
-
-    if (!shouldWait.get()) {
-      DeviceSchemaFetchTask task =
-          waitingTaskMap.compute(
-              devicePath,
-              (key, value) -> {
-                if (value == null) {
-                  value = new DeviceSchemaFetchTask();
-                }
-                value.addWaitingThread(measurements);
-                return value;
-              });
-      if (executingTaskMap.get(devicePath) != task) {
-        synchronized (task) {
-          if (executingTaskMap.get(devicePath) != task) {
-            while (true) {
-              if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) {
-                break;
-              }
-            }
-            PathPatternTree patternTree = new PathPatternTree();
-            for (String measurement : task.measurementSet) {
-              patternTree.appendFullPath(devicePath, measurement);
-            }
-            ClusterSchemaTree fetchedSchemaTree =
-                executeSchemaFetchQuery(
-                    new SchemaFetchStatement(
-                        patternTree, templateSetInfoProvider.apply(devicePath), false));
-            Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult =
-                new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree);
-            while (true) {
-              if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult)
-                  != fetchSchemaResult) {
-                break;
-              }
-            }
-          }
-        }
-      }
-    }
-
-    while (true) {
-      if (!fetchedResultMap.containsKey(devicePath)) {
-        break;
-      }
-    }
-    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-    Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
-    if (pair.left.decrementAndGet() == 0) {
-      fetchedResultMap.remove(devicePath);
-    }
-
-    return null;
+    return result;
   }
 
   ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
@@ -207,58 +157,91 @@ class ClusterSchemaFetchExecutor {
 
   private class DeviceSchemaFetchTaskExecutor {
 
-    private volatile DeviceSchemaFetchTask waitingTask;
+    // buffer all the requests, waiting for execution
+    private volatile DeviceSchemaFetchTask newTask;
 
+    // task on executing
     private volatile DeviceSchemaFetchTask executingTask;
 
-    private volatile int restThreadNum;
-
-    private volatile ClusterSchemaTree fetchedResult;
-
+    // used for concurrent control of newTask R/W operation
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
-    private ClusterSchemaTree execute(List<String> measurements) {
-      boolean needNewTask = false;
-      readWriteLock.readLock().lock();
-      try {
-        if (executingTask != null) {
-          needNewTask = !executingTask.checkAndAddWaitingThread(measurements);
-        }
-        if (needNewTask) {
-          DeviceSchemaFetchTask task = waitingTask;
-          synchronized (this) {
-            readWriteLock.readLock().unlock();
-            readWriteLock.writeLock().lock();
-            try {
-              if (waitingTask == null) {
-                waitingTask = new DeviceSchemaFetchTask();
-                task = waitingTask;
+    // thread-safe, protected by deviceSchemaFetchTaskExecutorMap.compute
+    private int referenceCount = 0;
+
+    private ClusterSchemaTree execute(PartialPath devicePath, List<String> measurements) {
+      DeviceSchemaFetchTask task = this.executingTask;
+      // check whether the executing task can cover this request, if so, just waiting for this task
+      if (task == null || !task.canCoverRequest(measurements)) {
+        readWriteLock.readLock().lock();
+        try {
+          if ((task = this.newTask) == null) {
+            synchronized (this) {
+              if ((task = this.newTask) == null) {
+                task = this.newTask = new DeviceSchemaFetchTask();
               }
-              waitingTask.addWaitingThread(measurements);
-            } finally {
-              readWriteLock.writeLock().unlock();
             }
           }
-          synchronized (task) {
+          // this operation shall be blocked by task submitting operation
+          // since once the task has been submitted, the info new added requests will be invalid
+          task.addRequest(measurements);
+        } finally {
+          readWriteLock.readLock().unlock();
+        }
+        if (!task.isSubmitting()) {
+          if (task.markSubmitting()) {
+            // only one thread will execute this block
+            while (true) {
+              // waiting for execution
+              if (this.executingTask == null) {
+                // block all request adding operation
+                readWriteLock.writeLock().lock();
+                try {
+                  // make this task as executing state for new request check
+                  this.executingTask = task;
+                  // make this field free for buffering new task
+                  this.newTask = null;
+                  break;
+                } finally {
+                  readWriteLock.writeLock().unlock();
+                }
+              }
+            }
+            // do execution and save fetched result
+            task.saveResult(
+                executeSchemaFetchQuery(
+                    new SchemaFetchStatement(
+                        task.generatePatternTree(devicePath),
+                        templateSetInfoProvider.apply(devicePath),
+                        false)));
+            // release this field for new task execution
+            this.executingTask = null;
           }
         }
-      } finally {
-        readWriteLock.readLock().unlock();
       }
 
-      if (!executingTask.checkAndAddWaitingThread(measurements)) {}
+      // each request takes needed result from the fetched schema tree
+      return task.extractResult(devicePath, measurements);
+    }
+
+    private void incReferenceCount() {
+      referenceCount++;
+    }
 
-      return null;
+    private int decAndGetReferenceCount() {
+      return --referenceCount;
     }
   }
 
   private static class DeviceSchemaFetchTask {
 
-    private int waitingThreadNum = 0;
-
     private final Set<String> measurementSet = new HashSet<>();
 
-    private boolean checkAndAddWaitingThread(List<String> measurements) {
+    private volatile boolean hasSubmitThread = false;
+
+    private volatile ClusterSchemaTree taskResult;
+
+    private boolean canCoverRequest(List<String> measurements) {
       if (measurementSet.size() < measurements.size()) {
         return false;
       }
@@ -267,13 +250,50 @@ class ClusterSchemaFetchExecutor {
           return false;
         }
       }
-      waitingThreadNum++;
+
       return true;
     }
 
-    private void addWaitingThread(List<String> measurements) {
-      waitingThreadNum++;
+    private void addRequest(List<String> measurements) {
       measurementSet.addAll(measurements);
     }
+
+    private boolean isSubmitting() {
+      return hasSubmitThread;
+    }
+
+    private boolean markSubmitting() {
+      if (hasSubmitThread) {
+        return false;
+      }
+      synchronized (this) {
+        if (hasSubmitThread) {
+          return false;
+        }
+        hasSubmitThread = true;
+        return true;
+      }
+    }
+
+    private PathPatternTree generatePatternTree(PartialPath devicePath) {
+      PathPatternTree patternTree = new PathPatternTree();
+      for (String measurement : measurementSet) {
+        patternTree.appendFullPath(devicePath, measurement);
+      }
+      return patternTree;
+    }
+
+    private void saveResult(ClusterSchemaTree clusterSchemaTree) {
+      taskResult = clusterSchemaTree;
+    }
+
+    private ClusterSchemaTree extractResult(PartialPath devicePath, List<String> measurements) {
+      while (true) {
+        if (taskResult != null) {
+          break;
+        }
+      }
+      return taskResult.extractDeviceSubTree(devicePath, measurements);
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 52eabcdcc6..6a80668a5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -124,9 +124,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     List<PartialPath> fullPathList = new ArrayList<>();
+    Map<PartialPath, List<String>> deviceMap = new HashMap<>();
     for (PartialPath pattern : pathPatternList) {
       if (!pattern.hasWildcard()) {
         fullPathList.add(pattern);
+        deviceMap
+            .computeIfAbsent(pattern.getDevicePath(), k -> new ArrayList<>())
+            .add(pattern.getMeasurement());
       }
     }
 
@@ -164,9 +168,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         }
       }
 
-      schemaTree =
-          clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-              new SchemaFetchStatement(patternTree, templateMap, false));
+      if (deviceMap.size() == 1) {
+        Map.Entry<PartialPath, List<String>> entry = deviceMap.entrySet().iterator().next();
+        schemaTree =
+            clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(entry.getKey(), entry.getValue());
+      } else {
+        schemaTree =
+            clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+                new SchemaFetchStatement(patternTree, templateMap, false));
+      }
 
       // only cache the schema fetched by full path
       List<MeasurementPath> measurementPathList;
@@ -204,11 +214,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
 
       // try fetch the missing schema from remote and cache fetched schema
-      PathPatternTree patternTree = new PathPatternTree();
-      for (int index : indexOfMissingMeasurements) {
-        patternTree.appendFullPath(devicePath, measurements[index]);
-      }
-      ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree);
+      ClusterSchemaTree remoteSchemaTree =
+          clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+              devicePath,
+              indexOfMissingMeasurements.stream()
+                  .map(index -> measurements[index])
+                  .collect(Collectors.toList()));
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
         schemaCache.put(remoteSchemaTree);


[iotdb] 01/08: add analyze.schema package

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f4a8189aec7fecc275bab65d677fdd9129da15ee
Author: Marccos <15...@qq.com>
AuthorDate: Mon Dec 12 10:43:08 2022 +0800

    add analyze.schema package
---
 .../java/org/apache/iotdb/db/client/DataNodeInternalClient.java     | 6 +++---
 .../apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java | 2 +-
 server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java  | 2 +-
 .../java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java   | 2 ++
 .../main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java    | 3 +++
 .../db/mpp/plan/analyze/{ => schema}/ClusterSchemaFetcher.java      | 3 ++-
 .../iotdb/db/mpp/plan/analyze/{ => schema}/ISchemaFetcher.java      | 2 +-
 .../iotdb/db/mpp/plan/analyze/{ => schema}/SchemaValidator.java     | 2 +-
 .../db/mpp/plan/analyze/{ => schema}/StandaloneSchemaFetcher.java   | 2 +-
 .../java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java | 2 +-
 .../org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java | 2 +-
 .../java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java   | 6 +++---
 .../apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java   | 6 +++---
 .../org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java  | 6 +++---
 .../org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java  | 6 +++---
 .../apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java   | 6 +++---
 .../db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java      | 6 +++---
 server/src/main/java/org/apache/iotdb/db/sync/SyncService.java      | 2 +-
 .../main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java   | 6 +++---
 .../org/apache/iotdb/db/sync/transport/server/ReceiverManager.java  | 2 +-
 .../java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java | 2 +-
 .../org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java     | 4 ++--
 .../org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 1 +
 .../java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java   | 2 +-
 24 files changed, 45 insertions(+), 38 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
index 419af63350..8573d918ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java
@@ -29,11 +29,11 @@ import org.apache.iotdb.db.exception.IntoProcessException;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index c1f46626e7..73df88fd94 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 3a3002850e..4940a4f538 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.execution.QueryIdGenerator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 81910ef650..157483c078 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -47,6 +47,8 @@ import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 010b491e55..616c6def17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 
 import static org.apache.iotdb.db.mpp.common.QueryId.mockQueryId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 16a49ae47a..67fc47d941 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
+import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
index 5ec75fe1da..37516321c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaFetcher.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 391aed20dc..01ae6cc207 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/StandaloneSchemaFetcher.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/StandaloneSchemaFetcher.java
index f2ab109b79..5882f6f12f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/StandaloneSchemaFetcher.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.analyze;
+package org.apache.iotdb.db.mpp.plan.analyze.schema;
 
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 7cfa0a3fa1..25d8b6af74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -36,8 +36,8 @@ import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
 import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
 import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index 7e06bd5e38..741a6709df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 0d1193fa6c..1b8b07b4ac 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -25,11 +25,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
index fbba149e66..3c0dc2f692 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/GrafanaApiServiceImpl.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
index 4892e13da7..c2f4faf687 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/rest/impl/RestApiServiceImpl.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
index 18f8b8b7b2..fe155e7439 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalReporter.java
@@ -27,11 +27,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index f3f9d4e5ba..fc4b6465f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -35,11 +35,11 @@ import org.apache.iotdb.db.metadata.template.TemplateQueryType;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index e7011a2985..2a5201ba1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -77,11 +77,11 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index ccc95932b5..d5df9a8a73 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.qp.utils.DateTimeUtils;
 import org.apache.iotdb.db.sync.common.ClusterSyncInfoFetcher;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
index 754893d562..c77abdebf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/load/ILoader.java
@@ -22,11 +22,11 @@ import org.apache.iotdb.commons.exception.sync.PipeDataLoadException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 
 /**
  * This interface is used to load files, including tsFile, syncTask, schema, modsFile and
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
index d0c348fda1..a8ffdab2d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/ReceiverManager.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index f468e09d9b..89430bc91f 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
index 060990cfcb..74c5aee931 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/StandaloneCoordinatorTest.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 61951feee4..a65b83d03a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
 import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index a4a3c6c580..7c908b2f4d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -46,7 +46,7 @@ import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
-import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;


[iotdb] 05/08: save trial

Posted by zy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b069950bc07c51c50952bb2b63d610505cac49a
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 16:09:43 2022 +0800

    save trial
---
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 128 ++++++++++++++-------
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   2 +-
 2 files changed, 86 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 75fd5623ac..2008dadaeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -59,11 +59,13 @@ class ClusterSchemaFetchExecutor {
   private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
   private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
 
-  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>();
+  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap =
+      new ConcurrentHashMap<>();
 
-  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>();
+  private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap =
+      new ConcurrentHashMap<>();
 
-  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>();
+  private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>();
 
   ClusterSchemaFetchExecutor(
       Coordinator coordinator,
@@ -76,54 +78,69 @@ class ClusterSchemaFetchExecutor {
     this.templateSetInfoProvider = templateSetInfoProvider;
   }
 
-  ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){
+  ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
 
     final AtomicBoolean shouldWait = new AtomicBoolean(true);
-    executingTaskMap.compute(devicePath, (key, value) -> {
-      if (value == null){
-        return null;
-      }
-      if (value.right.size() < measurements.size()){
-        shouldWait.set(false);
-        return value;
-      }
-      for (String measurement: measurements){
-        if (!value.right.contains(measurement)){
-          shouldWait.set(false);
+    executingTaskMap.compute(
+        devicePath,
+        (key, value) -> {
+          if (value == null) {
+            return null;
+          }
+          shouldWait.set(value.checkAndAddWaitingThread(measurements));
           return value;
+        });
+
+    if (!shouldWait.get()) {
+      DeviceSchemaFetchTask task =
+          waitingTaskMap.compute(
+              devicePath,
+              (key, value) -> {
+                if (value == null) {
+                  value = new DeviceSchemaFetchTask();
+                }
+                value.addWaitingThread(measurements);
+                return value;
+              });
+      if (executingTaskMap.get(devicePath) != task) {
+        synchronized (task) {
+          if (executingTaskMap.get(devicePath) != task) {
+            while (true) {
+              if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) {
+                break;
+              }
+            }
+            PathPatternTree patternTree = new PathPatternTree();
+            for (String measurement : task.measurementSet) {
+              patternTree.appendFullPath(devicePath, measurement);
+            }
+            ClusterSchemaTree fetchedSchemaTree =
+                executeSchemaFetchQuery(
+                    new SchemaFetchStatement(
+                        patternTree, templateSetInfoProvider.apply(devicePath), false));
+            Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult =
+                new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree);
+            while (true) {
+              if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult)
+                  != fetchSchemaResult) {
+                break;
+              }
+            }
+          }
         }
       }
-      value.left.getAndIncrement();
-      return value;
-    });
-    if (shouldWait.get()){
-      while (!fetchedResultMap.containsKey(devicePath));
-      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-      Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
-      if (pair.left.decrementAndGet() == 0){
-        fetchedResultMap.remove(devicePath);
-      }
-    }else {
-      Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> {
-        if (value == null){
-          value = new Pair<>(new AtomicInteger(0), new HashSet<>());
-        }
-        value.left.getAndIncrement();
-        value.right.addAll(measurements);
-        return value;
-      });
-      synchronized (task){
-        while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task);
-        PathPatternTree patternTree = new PathPatternTree();
-        for (String measurement: task.right){
-          patternTree.appendFullPath(devicePath, measurement);
-        }
-        ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false));
-        Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree);
-        while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult);
+    }
 
+    while (true) {
+      if (!fetchedResultMap.containsKey(devicePath)) {
+        break;
       }
     }
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+    Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
+    if (pair.left.decrementAndGet() == 0) {
+      fetchedResultMap.remove(devicePath);
+    }
 
     return null;
   }
@@ -186,4 +203,29 @@ class ClusterSchemaFetchExecutor {
       // Totally memory operation. This case won't happen.
     }
   }
+
+  private static class DeviceSchemaFetchTask {
+
+    private int waitingThreadNum = 0;
+
+    private final Set<String> measurementSet = new HashSet<>();
+
+    private boolean checkAndAddWaitingThread(List<String> measurements) {
+      if (measurementSet.size() < measurements.size()) {
+        return false;
+      }
+      for (String measurement : measurements) {
+        if (!measurementSet.contains(measurement)) {
+          return false;
+        }
+      }
+      waitingThreadNum++;
+      return true;
+    }
+
+    private void addWaitingThread(List<String> measurements) {
+      waitingThreadNum++;
+      measurementSet.addAll(measurements);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 564b97bfba..52eabcdcc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -84,7 +84,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
                   ClusterPartitionFetcher.getInstance(),
                   this,
                   config.getQueryTimeoutThreshold()),
-              templateManager::checkAllRelatedTemplate);
+          templateManager::checkAllRelatedTemplate);
 
   private static final class ClusterSchemaFetcherHolder {
     private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();