You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:03 UTC
[iotdb] 06/08: save trial
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0ebd32b922e10130f62b14c1d4bd41aba0a9fce7
Author: Marccos <15...@qq.com>
AuthorDate: Wed Dec 14 20:19:48 2022 +0800
save trial
---
.../analyze/schema/ClusterSchemaFetchExecutor.java | 50 +++++++++++++++++++++-
1 file changed, 49 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 2008dadaeb..7f25b2f640 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -48,6 +48,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -79,7 +81,6 @@ class ClusterSchemaFetchExecutor {
}
ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
-
final AtomicBoolean shouldWait = new AtomicBoolean(true);
executingTaskMap.compute(
devicePath,
@@ -204,6 +205,53 @@ class ClusterSchemaFetchExecutor {
}
}
+ private class DeviceSchemaFetchTaskExecutor {
+
+ private volatile DeviceSchemaFetchTask waitingTask;
+
+ private volatile DeviceSchemaFetchTask executingTask;
+
+ private volatile int restThreadNum;
+
+ private volatile ClusterSchemaTree fetchedResult;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ private ClusterSchemaTree execute(List<String> measurements) {
+ boolean needNewTask = false;
+ readWriteLock.readLock().lock();
+ try {
+ if (executingTask != null) {
+ needNewTask = !executingTask.checkAndAddWaitingThread(measurements);
+ }
+ if (needNewTask) {
+ DeviceSchemaFetchTask task = waitingTask;
+ synchronized (this) {
+ readWriteLock.readLock().unlock();
+ readWriteLock.writeLock().lock();
+ try {
+ if (waitingTask == null) {
+ waitingTask = new DeviceSchemaFetchTask();
+ task = waitingTask;
+ }
+ waitingTask.addWaitingThread(measurements);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+ synchronized (task) {
+ }
+ }
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+
+ if (!executingTask.checkAndAddWaitingThread(measurements)) {}
+
+ return null;
+ }
+ }
+
private static class DeviceSchemaFetchTask {
private int waitingThreadNum = 0;