You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:02 UTC

[iotdb] 05/08: save trial

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b069950bc07c51c50952bb2b63d610505cac49a
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 16:09:43 2022 +0800

    save trial
---
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 128 ++++++++++++++-------
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |   2 +-
 2 files changed, 86 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 75fd5623ac..2008dadaeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -59,11 +59,13 @@ class ClusterSchemaFetchExecutor {
   private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
   private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
 
-  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap = new ConcurrentHashMap<>();
+  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap =
+      new ConcurrentHashMap<>();
 
-  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> executingTaskMap = new ConcurrentHashMap<>();
+  private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap =
+      new ConcurrentHashMap<>();
 
-  private final Map<PartialPath, Pair<AtomicInteger, Set<String>>> waitingTaskMap = new ConcurrentHashMap<>();
+  private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>();
 
   ClusterSchemaFetchExecutor(
       Coordinator coordinator,
@@ -76,54 +78,69 @@ class ClusterSchemaFetchExecutor {
     this.templateSetInfoProvider = templateSetInfoProvider;
   }
 
-  ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements){
+  ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
 
     final AtomicBoolean shouldWait = new AtomicBoolean(true);
-    executingTaskMap.compute(devicePath, (key, value) -> {
-      if (value == null){
-        return null;
-      }
-      if (value.right.size() < measurements.size()){
-        shouldWait.set(false);
-        return value;
-      }
-      for (String measurement: measurements){
-        if (!value.right.contains(measurement)){
-          shouldWait.set(false);
+    executingTaskMap.compute(
+        devicePath,
+        (key, value) -> {
+          if (value == null) {
+            return null;
+          }
+          shouldWait.set(value.checkAndAddWaitingThread(measurements));
           return value;
+        });
+
+    if (!shouldWait.get()) {
+      DeviceSchemaFetchTask task =
+          waitingTaskMap.compute(
+              devicePath,
+              (key, value) -> {
+                if (value == null) {
+                  value = new DeviceSchemaFetchTask();
+                }
+                value.addWaitingThread(measurements);
+                return value;
+              });
+      if (executingTaskMap.get(devicePath) != task) {
+        synchronized (task) {
+          if (executingTaskMap.get(devicePath) != task) {
+            while (true) {
+              if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) {
+                break;
+              }
+            }
+            PathPatternTree patternTree = new PathPatternTree();
+            for (String measurement : task.measurementSet) {
+              patternTree.appendFullPath(devicePath, measurement);
+            }
+            ClusterSchemaTree fetchedSchemaTree =
+                executeSchemaFetchQuery(
+                    new SchemaFetchStatement(
+                        patternTree, templateSetInfoProvider.apply(devicePath), false));
+            Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult =
+                new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree);
+            while (true) {
+              if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult)
+                  != fetchSchemaResult) {
+                break;
+              }
+            }
+          }
         }
       }
-      value.left.getAndIncrement();
-      return value;
-    });
-    if (shouldWait.get()){
-      while (!fetchedResultMap.containsKey(devicePath));
-      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-      Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
-      if (pair.left.decrementAndGet() == 0){
-        fetchedResultMap.remove(devicePath);
-      }
-    }else {
-      Pair<AtomicInteger, Set<String>> task = waitingTaskMap.compute(devicePath, (key, value) -> {
-        if (value == null){
-          value = new Pair<>(new AtomicInteger(0), new HashSet<>());
-        }
-        value.left.getAndIncrement();
-        value.right.addAll(measurements);
-        return value;
-      });
-      synchronized (task){
-        while (executingTaskMap.computeIfAbsent(devicePath, key -> task)!=task);
-        PathPatternTree patternTree = new PathPatternTree();
-        for (String measurement: task.right){
-          patternTree.appendFullPath(devicePath, measurement);
-        }
-        ClusterSchemaTree fetchedSchemaTree = executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateSetInfoProvider.apply(devicePath), false));
-        Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult = new Pair<>(task.left, fetchedSchemaTree);
-        while (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult) != fetchSchemaResult);
+    }
 
+    while (true) {
+      if (!fetchedResultMap.containsKey(devicePath)) {
+        break;
       }
     }
+    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
+    Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
+    if (pair.left.decrementAndGet() == 0) {
+      fetchedResultMap.remove(devicePath);
+    }
 
     return null;
   }
@@ -186,4 +203,29 @@ class ClusterSchemaFetchExecutor {
       // Totally memory operation. This case won't happen.
     }
   }
+
+  private static class DeviceSchemaFetchTask {
+
+    private int waitingThreadNum = 0;
+
+    private final Set<String> measurementSet = new HashSet<>();
+
+    private boolean checkAndAddWaitingThread(List<String> measurements) {
+      if (measurementSet.size() < measurements.size()) {
+        return false;
+      }
+      for (String measurement : measurements) {
+        if (!measurementSet.contains(measurement)) {
+          return false;
+        }
+      }
+      waitingThreadNum++;
+      return true;
+    }
+
+    private void addWaitingThread(List<String> measurements) {
+      waitingThreadNum++;
+      measurementSet.addAll(measurements);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 564b97bfba..52eabcdcc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -84,7 +84,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
                   ClusterPartitionFetcher.getInstance(),
                   this,
                   config.getQueryTimeoutThreshold()),
-              templateManager::checkAllRelatedTemplate);
+          templateManager::checkAllRelatedTemplate);
 
   private static final class ClusterSchemaFetcherHolder {
     private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher();