You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:03 UTC

[iotdb] 06/08: save trial

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0ebd32b922e10130f62b14c1d4bd41aba0a9fce7
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 20:19:48 2022 +0800

    save trial
---
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 50 +++++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 2008dadaeb..7f25b2f640 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -48,6 +48,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -79,7 +81,6 @@ class ClusterSchemaFetchExecutor {
   }
 
   ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
-
     final AtomicBoolean shouldWait = new AtomicBoolean(true);
     executingTaskMap.compute(
         devicePath,
@@ -204,6 +205,53 @@ class ClusterSchemaFetchExecutor {
     }
   }
 
+  private class DeviceSchemaFetchTaskExecutor {
+
+    private volatile DeviceSchemaFetchTask waitingTask;
+
+    private volatile DeviceSchemaFetchTask executingTask;
+
+    private volatile int restThreadNum;
+
+    private volatile ClusterSchemaTree fetchedResult;
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+    private ClusterSchemaTree execute(List<String> measurements) {
+      boolean needNewTask = false;
+      readWriteLock.readLock().lock();
+      try {
+        if (executingTask != null) {
+          needNewTask = !executingTask.checkAndAddWaitingThread(measurements);
+        }
+        if (needNewTask) {
+          DeviceSchemaFetchTask task = waitingTask;
+          synchronized (this) {
+            readWriteLock.readLock().unlock();
+            readWriteLock.writeLock().lock();
+            try {
+              if (waitingTask == null) {
+                waitingTask = new DeviceSchemaFetchTask();
+                task = waitingTask;
+              }
+              waitingTask.addWaitingThread(measurements);
+            } finally {
+              readWriteLock.writeLock().unlock();
+            }
+          }
+          synchronized (task) {
+          }
+        }
+      } finally {
+        readWriteLock.readLock().unlock();
+      }
+
+      if (!executingTask.checkAndAddWaitingThread(measurements)) {}
+
+      return null;
+    }
+  }
+
   private static class DeviceSchemaFetchTask {
 
     private int waitingThreadNum = 0;