You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:04 UTC

[iotdb] 07/08: basically implement

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ac49e5c19cbfca2f753aa52ebcf9210db0c70532
Author: Marccos <15...@qq.com>
AuthorDate: Thu Dec 15 14:42:38 2022 +0800

    basically implement
---
 .../mpp/common/schematree/ClusterSchemaTree.java   |  40 ++++
 .../analyze/schema/ClusterSchemaFetchExecutor.java | 222 +++++++++++----------
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |  27 ++-
 3 files changed, 180 insertions(+), 109 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index d1581176ca..9556086fab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -325,4 +325,44 @@ public class ClusterSchemaTree implements ISchemaTree {
   public boolean isEmpty() {
     return root.getChildren() == null || root.getChildren().size() == 0;
   }
+
+  public ClusterSchemaTree extractDeviceSubTree(PartialPath devicePath, List<String> measurements) {
+    SchemaNode root = new SchemaInternalNode(PATH_ROOT);
+    String[] nodes = devicePath.getNodes();
+    SchemaNode cur = root;
+    SchemaNode clonedCur = root;
+    SchemaNode clonedChild;
+    for (int i = 1; i < nodes.length - 1; i++) {
+      cur = cur.getChild(nodes[i]);
+      if (cur == null) {
+        return new ClusterSchemaTree();
+      }
+
+      clonedChild = new SchemaInternalNode(cur.getName());
+      clonedCur.addChild(nodes[i], clonedChild);
+      clonedCur = clonedChild;
+    }
+
+    cur = cur.getChild(nodes[nodes.length - 1]);
+    if (cur == null || !cur.isEntity()) {
+      return new ClusterSchemaTree();
+    }
+
+    SchemaEntityNode deviceNode = cur.getAsEntityNode();
+    SchemaEntityNode clonedDeviceNode = new SchemaEntityNode(nodes[nodes.length - 1]);
+    clonedDeviceNode.setAligned(deviceNode.isAligned());
+    SchemaNode child;
+    SchemaMeasurementNode measurementNode;
+    for (String measurement : measurements) {
+      child = deviceNode.getChild(measurement);
+      if (child.isMeasurement()) {
+        measurementNode = child.getAsMeasurementNode();
+        clonedDeviceNode.addChild(measurementNode.getName(), measurementNode);
+        if (measurementNode.getAlias() != null) {
+          clonedDeviceNode.addAliasChild(measurementNode.getAlias(), measurementNode);
+        }
+      }
+    }
+    return new ClusterSchemaTree(root);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 7f25b2f640..a76f1a23c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.ByteArrayInputStream;
@@ -46,8 +45,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
@@ -61,14 +58,9 @@ class ClusterSchemaFetchExecutor {
   private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
   private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
 
-  private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap =
+  private final Map<PartialPath, DeviceSchemaFetchTaskExecutor> deviceSchemaFetchTaskExecutorMap =
       new ConcurrentHashMap<>();
 
-  private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap =
-      new ConcurrentHashMap<>();
-
-  private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>();
-
   ClusterSchemaFetchExecutor(
       Coordinator coordinator,
       Supplier<Long> queryIdProvider,
@@ -81,69 +73,27 @@ class ClusterSchemaFetchExecutor {
   }
 
   ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
-    final AtomicBoolean shouldWait = new AtomicBoolean(true);
-    executingTaskMap.compute(
+    ClusterSchemaTree result =
+        deviceSchemaFetchTaskExecutorMap
+            .compute(
+                devicePath,
+                (key, value) -> {
+                  if (value == null) {
+                    value = new DeviceSchemaFetchTaskExecutor();
+                    value.incReferenceCount();
+                  }
+                  return value;
+                })
+            .execute(devicePath, measurements);
+    deviceSchemaFetchTaskExecutorMap.compute(
         devicePath,
         (key, value) -> {
-          if (value == null) {
+          if (value == null || value.decAndGetReferenceCount() == 0) {
             return null;
           }
-          shouldWait.set(value.checkAndAddWaitingThread(measurements));
           return value;
         });
-
-    if (!shouldWait.get()) {
-      DeviceSchemaFetchTask task =
-          waitingTaskMap.compute(
-              devicePath,
-              (key, value) -> {
-                if (value == null) {
-                  value = new DeviceSchemaFetchTask();
-                }
-                value.addWaitingThread(measurements);
-                return value;
-              });
-      if (executingTaskMap.get(devicePath) != task) {
-        synchronized (task) {
-          if (executingTaskMap.get(devicePath) != task) {
-            while (true) {
-              if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) {
-                break;
-              }
-            }
-            PathPatternTree patternTree = new PathPatternTree();
-            for (String measurement : task.measurementSet) {
-              patternTree.appendFullPath(devicePath, measurement);
-            }
-            ClusterSchemaTree fetchedSchemaTree =
-                executeSchemaFetchQuery(
-                    new SchemaFetchStatement(
-                        patternTree, templateSetInfoProvider.apply(devicePath), false));
-            Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult =
-                new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree);
-            while (true) {
-              if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult)
-                  != fetchSchemaResult) {
-                break;
-              }
-            }
-          }
-        }
-      }
-    }
-
-    while (true) {
-      if (!fetchedResultMap.containsKey(devicePath)) {
-        break;
-      }
-    }
-    ClusterSchemaTree schemaTree = new ClusterSchemaTree();
-    Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
-    if (pair.left.decrementAndGet() == 0) {
-      fetchedResultMap.remove(devicePath);
-    }
-
-    return null;
+    return result;
   }
 
   ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
@@ -207,58 +157,91 @@ class ClusterSchemaFetchExecutor {
 
   private class DeviceSchemaFetchTaskExecutor {
 
-    private volatile DeviceSchemaFetchTask waitingTask;
+    // buffer all the requests, waiting for execution
+    private volatile DeviceSchemaFetchTask newTask;
 
+    // task on executing
     private volatile DeviceSchemaFetchTask executingTask;
 
-    private volatile int restThreadNum;
-
-    private volatile ClusterSchemaTree fetchedResult;
-
+    // used for concurrent control of newTask R/W operation
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
-    private ClusterSchemaTree execute(List<String> measurements) {
-      boolean needNewTask = false;
-      readWriteLock.readLock().lock();
-      try {
-        if (executingTask != null) {
-          needNewTask = !executingTask.checkAndAddWaitingThread(measurements);
-        }
-        if (needNewTask) {
-          DeviceSchemaFetchTask task = waitingTask;
-          synchronized (this) {
-            readWriteLock.readLock().unlock();
-            readWriteLock.writeLock().lock();
-            try {
-              if (waitingTask == null) {
-                waitingTask = new DeviceSchemaFetchTask();
-                task = waitingTask;
+    // thread-safe, protected by deviceSchemaFetchTaskExecutorMap.compute
+    private int referenceCount = 0;
+
+    private ClusterSchemaTree execute(PartialPath devicePath, List<String> measurements) {
+      DeviceSchemaFetchTask task = this.executingTask;
+      // check whether the executing task can cover this request, if so, just waiting for this task
+      if (task == null || !task.canCoverRequest(measurements)) {
+        readWriteLock.readLock().lock();
+        try {
+          if ((task = this.newTask) == null) {
+            synchronized (this) {
+              if ((task = this.newTask) == null) {
+                task = this.newTask = new DeviceSchemaFetchTask();
               }
-              waitingTask.addWaitingThread(measurements);
-            } finally {
-              readWriteLock.writeLock().unlock();
             }
           }
-          synchronized (task) {
+          // this operation shall be blocked by task submitting operation
+          // since once the task has been submitted, the info new added requests will be invalid
+          task.addRequest(measurements);
+        } finally {
+          readWriteLock.readLock().unlock();
+        }
+        if (!task.isSubmitting()) {
+          if (task.markSubmitting()) {
+            // only one thread will execute this block
+            while (true) {
+              // waiting for execution
+              if (this.executingTask == null) {
+                // block all request adding operation
+                readWriteLock.writeLock().lock();
+                try {
+                  // make this task as executing state for new request check
+                  this.executingTask = task;
+                  // make this field free for buffering new task
+                  this.newTask = null;
+                  break;
+                } finally {
+                  readWriteLock.writeLock().unlock();
+                }
+              }
+            }
+            // do execution and save fetched result
+            task.saveResult(
+                executeSchemaFetchQuery(
+                    new SchemaFetchStatement(
+                        task.generatePatternTree(devicePath),
+                        templateSetInfoProvider.apply(devicePath),
+                        false)));
+            // release this field for new task execution
+            this.executingTask = null;
           }
         }
-      } finally {
-        readWriteLock.readLock().unlock();
       }
 
-      if (!executingTask.checkAndAddWaitingThread(measurements)) {}
+      // each request takes needed result from the fetched schema tree
+      return task.extractResult(devicePath, measurements);
+    }
+
+    private void incReferenceCount() {
+      referenceCount++;
+    }
 
-      return null;
+    private int decAndGetReferenceCount() {
+      return --referenceCount;
     }
   }
 
   private static class DeviceSchemaFetchTask {
 
-    private int waitingThreadNum = 0;
-
     private final Set<String> measurementSet = new HashSet<>();
 
-    private boolean checkAndAddWaitingThread(List<String> measurements) {
+    private volatile boolean hasSubmitThread = false;
+
+    private volatile ClusterSchemaTree taskResult;
+
+    private boolean canCoverRequest(List<String> measurements) {
       if (measurementSet.size() < measurements.size()) {
         return false;
       }
@@ -267,13 +250,50 @@ class ClusterSchemaFetchExecutor {
           return false;
         }
       }
-      waitingThreadNum++;
+
       return true;
     }
 
-    private void addWaitingThread(List<String> measurements) {
-      waitingThreadNum++;
+    private void addRequest(List<String> measurements) {
       measurementSet.addAll(measurements);
     }
+
+    private boolean isSubmitting() {
+      return hasSubmitThread;
+    }
+
+    private boolean markSubmitting() {
+      if (hasSubmitThread) {
+        return false;
+      }
+      synchronized (this) {
+        if (hasSubmitThread) {
+          return false;
+        }
+        hasSubmitThread = true;
+        return true;
+      }
+    }
+
+    private PathPatternTree generatePatternTree(PartialPath devicePath) {
+      PathPatternTree patternTree = new PathPatternTree();
+      for (String measurement : measurementSet) {
+        patternTree.appendFullPath(devicePath, measurement);
+      }
+      return patternTree;
+    }
+
+    private void saveResult(ClusterSchemaTree clusterSchemaTree) {
+      taskResult = clusterSchemaTree;
+    }
+
+    private ClusterSchemaTree extractResult(PartialPath devicePath, List<String> measurements) {
+      while (true) {
+        if (taskResult != null) {
+          break;
+        }
+      }
+      return taskResult.extractDeviceSubTree(devicePath, measurements);
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 52eabcdcc6..6a80668a5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -124,9 +124,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     List<PartialPath> fullPathList = new ArrayList<>();
+    Map<PartialPath, List<String>> deviceMap = new HashMap<>();
     for (PartialPath pattern : pathPatternList) {
       if (!pattern.hasWildcard()) {
         fullPathList.add(pattern);
+        deviceMap
+            .computeIfAbsent(pattern.getDevicePath(), k -> new ArrayList<>())
+            .add(pattern.getMeasurement());
       }
     }
 
@@ -164,9 +168,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         }
       }
 
-      schemaTree =
-          clusterSchemaFetchExecutor.executeSchemaFetchQuery(
-              new SchemaFetchStatement(patternTree, templateMap, false));
+      if (deviceMap.size() == 1) {
+        Map.Entry<PartialPath, List<String>> entry = deviceMap.entrySet().iterator().next();
+        schemaTree =
+            clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(entry.getKey(), entry.getValue());
+      } else {
+        schemaTree =
+            clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+                new SchemaFetchStatement(patternTree, templateMap, false));
+      }
 
       // only cache the schema fetched by full path
       List<MeasurementPath> measurementPathList;
@@ -204,11 +214,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
       }
 
       // try fetch the missing schema from remote and cache fetched schema
-      PathPatternTree patternTree = new PathPatternTree();
-      for (int index : indexOfMissingMeasurements) {
-        patternTree.appendFullPath(devicePath, measurements[index]);
-      }
-      ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree);
+      ClusterSchemaTree remoteSchemaTree =
+          clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+              devicePath,
+              indexOfMissingMeasurements.stream()
+                  .map(index -> measurements[index])
+                  .collect(Collectors.toList()));
       if (!remoteSchemaTree.isEmpty()) {
         schemaTree.mergeSchemaTree(remoteSchemaTree);
         schemaCache.put(remoteSchemaTree);