You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2022/12/15 09:44:04 UTC
[iotdb] 07/08: basically implement
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch concurrent_schema_fetch
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac49e5c19cbfca2f753aa52ebcf9210db0c70532
Author: Marccos <15...@qq.com>
AuthorDate: Thu Dec 15 14:42:38 2022 +0800
basically implement
---
.../mpp/common/schematree/ClusterSchemaTree.java | 40 ++++
.../analyze/schema/ClusterSchemaFetchExecutor.java | 222 +++++++++++----------
.../plan/analyze/schema/ClusterSchemaFetcher.java | 27 ++-
3 files changed, 180 insertions(+), 109 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index d1581176ca..9556086fab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -325,4 +325,44 @@ public class ClusterSchemaTree implements ISchemaTree {
public boolean isEmpty() {
return root.getChildren() == null || root.getChildren().size() == 0;
}
+
+ public ClusterSchemaTree extractDeviceSubTree(PartialPath devicePath, List<String> measurements) {
+ SchemaNode root = new SchemaInternalNode(PATH_ROOT);
+ String[] nodes = devicePath.getNodes();
+ SchemaNode cur = root;
+ SchemaNode clonedCur = root;
+ SchemaNode clonedChild;
+ for (int i = 1; i < nodes.length - 1; i++) {
+ cur = cur.getChild(nodes[i]);
+ if (cur == null) {
+ return new ClusterSchemaTree();
+ }
+
+ clonedChild = new SchemaInternalNode(cur.getName());
+ clonedCur.addChild(nodes[i], clonedChild);
+ clonedCur = clonedChild;
+ }
+
+ cur = cur.getChild(nodes[nodes.length - 1]);
+ if (cur == null || !cur.isEntity()) {
+ return new ClusterSchemaTree();
+ }
+
+ SchemaEntityNode deviceNode = cur.getAsEntityNode();
+ SchemaEntityNode clonedDeviceNode = new SchemaEntityNode(nodes[nodes.length - 1]);
+ clonedDeviceNode.setAligned(deviceNode.isAligned());
+ SchemaNode child;
+ SchemaMeasurementNode measurementNode;
+ for (String measurement : measurements) {
+ child = deviceNode.getChild(measurement);
+ if (child.isMeasurement()) {
+ measurementNode = child.getAsMeasurementNode();
+ clonedDeviceNode.addChild(measurementNode.getName(), measurementNode);
+ if (measurementNode.getAlias() != null) {
+ clonedDeviceNode.addAliasChild(measurementNode.getAlias(), measurementNode);
+ }
+ }
+ }
+ return new ClusterSchemaTree(root);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 7f25b2f640..a76f1a23c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.ByteArrayInputStream;
@@ -46,8 +45,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
@@ -61,14 +58,9 @@ class ClusterSchemaFetchExecutor {
private final BiFunction<Long, Statement, ExecutionResult> statementExecutor;
private final Function<PartialPath, Map<Integer, Template>> templateSetInfoProvider;
- private final Map<PartialPath, Pair<AtomicInteger, ClusterSchemaTree>> fetchedResultMap =
+ private final Map<PartialPath, DeviceSchemaFetchTaskExecutor> deviceSchemaFetchTaskExecutorMap =
new ConcurrentHashMap<>();
- private final Map<PartialPath, DeviceSchemaFetchTask> executingTaskMap =
- new ConcurrentHashMap<>();
-
- private final Map<PartialPath, DeviceSchemaFetchTask> waitingTaskMap = new ConcurrentHashMap<>();
-
ClusterSchemaFetchExecutor(
Coordinator coordinator,
Supplier<Long> queryIdProvider,
@@ -81,69 +73,27 @@ class ClusterSchemaFetchExecutor {
}
ClusterSchemaTree fetchSchemaOfOneDevice(PartialPath devicePath, List<String> measurements) {
- final AtomicBoolean shouldWait = new AtomicBoolean(true);
- executingTaskMap.compute(
+ ClusterSchemaTree result =
+ deviceSchemaFetchTaskExecutorMap
+ .compute(
+ devicePath,
+ (key, value) -> {
+ if (value == null) {
+ value = new DeviceSchemaFetchTaskExecutor();
+ value.incReferenceCount();
+ }
+ return value;
+ })
+ .execute(devicePath, measurements);
+ deviceSchemaFetchTaskExecutorMap.compute(
devicePath,
(key, value) -> {
- if (value == null) {
+ if (value == null || value.decAndGetReferenceCount() == 0) {
return null;
}
- shouldWait.set(value.checkAndAddWaitingThread(measurements));
return value;
});
-
- if (!shouldWait.get()) {
- DeviceSchemaFetchTask task =
- waitingTaskMap.compute(
- devicePath,
- (key, value) -> {
- if (value == null) {
- value = new DeviceSchemaFetchTask();
- }
- value.addWaitingThread(measurements);
- return value;
- });
- if (executingTaskMap.get(devicePath) != task) {
- synchronized (task) {
- if (executingTaskMap.get(devicePath) != task) {
- while (true) {
- if (executingTaskMap.computeIfAbsent(devicePath, key -> task) != task) {
- break;
- }
- }
- PathPatternTree patternTree = new PathPatternTree();
- for (String measurement : task.measurementSet) {
- patternTree.appendFullPath(devicePath, measurement);
- }
- ClusterSchemaTree fetchedSchemaTree =
- executeSchemaFetchQuery(
- new SchemaFetchStatement(
- patternTree, templateSetInfoProvider.apply(devicePath), false));
- Pair<AtomicInteger, ClusterSchemaTree> fetchSchemaResult =
- new Pair<>(new AtomicInteger(task.waitingThreadNum), fetchedSchemaTree);
- while (true) {
- if (fetchedResultMap.computeIfAbsent(devicePath, key -> fetchSchemaResult)
- != fetchSchemaResult) {
- break;
- }
- }
- }
- }
- }
- }
-
- while (true) {
- if (!fetchedResultMap.containsKey(devicePath)) {
- break;
- }
- }
- ClusterSchemaTree schemaTree = new ClusterSchemaTree();
- Pair<AtomicInteger, ClusterSchemaTree> pair = fetchedResultMap.get(devicePath);
- if (pair.left.decrementAndGet() == 0) {
- fetchedResultMap.remove(devicePath);
- }
-
- return null;
+ return result;
}
ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) {
@@ -207,58 +157,91 @@ class ClusterSchemaFetchExecutor {
private class DeviceSchemaFetchTaskExecutor {
- private volatile DeviceSchemaFetchTask waitingTask;
+ // buffer all the requests, waiting for execution
+ private volatile DeviceSchemaFetchTask newTask;
+ // task on executing
private volatile DeviceSchemaFetchTask executingTask;
- private volatile int restThreadNum;
-
- private volatile ClusterSchemaTree fetchedResult;
-
+ // used for concurrent control of newTask R/W operation
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private ClusterSchemaTree execute(List<String> measurements) {
- boolean needNewTask = false;
- readWriteLock.readLock().lock();
- try {
- if (executingTask != null) {
- needNewTask = !executingTask.checkAndAddWaitingThread(measurements);
- }
- if (needNewTask) {
- DeviceSchemaFetchTask task = waitingTask;
- synchronized (this) {
- readWriteLock.readLock().unlock();
- readWriteLock.writeLock().lock();
- try {
- if (waitingTask == null) {
- waitingTask = new DeviceSchemaFetchTask();
- task = waitingTask;
+ // thread-safe, protected by deviceSchemaFetchTaskExecutorMap.compute
+ private int referenceCount = 0;
+
+ private ClusterSchemaTree execute(PartialPath devicePath, List<String> measurements) {
+ DeviceSchemaFetchTask task = this.executingTask;
+ // check whether the executing task can cover this request, if so, just waiting for this task
+ if (task == null || !task.canCoverRequest(measurements)) {
+ readWriteLock.readLock().lock();
+ try {
+ if ((task = this.newTask) == null) {
+ synchronized (this) {
+ if ((task = this.newTask) == null) {
+ task = this.newTask = new DeviceSchemaFetchTask();
}
- waitingTask.addWaitingThread(measurements);
- } finally {
- readWriteLock.writeLock().unlock();
}
}
- synchronized (task) {
+ // this operation shall be blocked by task submitting operation
+ // since once the task has been submitted, the info new added requests will be invalid
+ task.addRequest(measurements);
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ if (!task.isSubmitting()) {
+ if (task.markSubmitting()) {
+ // only one thread will execute this block
+ while (true) {
+ // waiting for execution
+ if (this.executingTask == null) {
+ // block all request adding operation
+ readWriteLock.writeLock().lock();
+ try {
+ // make this task as executing state for new request check
+ this.executingTask = task;
+ // make this field free for buffering new task
+ this.newTask = null;
+ break;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+ }
+ // do execution and save fetched result
+ task.saveResult(
+ executeSchemaFetchQuery(
+ new SchemaFetchStatement(
+ task.generatePatternTree(devicePath),
+ templateSetInfoProvider.apply(devicePath),
+ false)));
+ // release this field for new task execution
+ this.executingTask = null;
}
}
- } finally {
- readWriteLock.readLock().unlock();
}
- if (!executingTask.checkAndAddWaitingThread(measurements)) {}
+ // each request takes needed result from the fetched schema tree
+ return task.extractResult(devicePath, measurements);
+ }
+
+ private void incReferenceCount() {
+ referenceCount++;
+ }
- return null;
+ private int decAndGetReferenceCount() {
+ return --referenceCount;
}
}
private static class DeviceSchemaFetchTask {
- private int waitingThreadNum = 0;
-
private final Set<String> measurementSet = new HashSet<>();
- private boolean checkAndAddWaitingThread(List<String> measurements) {
+ private volatile boolean hasSubmitThread = false;
+
+ private volatile ClusterSchemaTree taskResult;
+
+ private boolean canCoverRequest(List<String> measurements) {
if (measurementSet.size() < measurements.size()) {
return false;
}
@@ -267,13 +250,50 @@ class ClusterSchemaFetchExecutor {
return false;
}
}
- waitingThreadNum++;
+
return true;
}
- private void addWaitingThread(List<String> measurements) {
- waitingThreadNum++;
+ private void addRequest(List<String> measurements) {
measurementSet.addAll(measurements);
}
+
+ private boolean isSubmitting() {
+ return hasSubmitThread;
+ }
+
+ private boolean markSubmitting() {
+ if (hasSubmitThread) {
+ return false;
+ }
+ synchronized (this) {
+ if (hasSubmitThread) {
+ return false;
+ }
+ hasSubmitThread = true;
+ return true;
+ }
+ }
+
+ private PathPatternTree generatePatternTree(PartialPath devicePath) {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (String measurement : measurementSet) {
+ patternTree.appendFullPath(devicePath, measurement);
+ }
+ return patternTree;
+ }
+
+ private void saveResult(ClusterSchemaTree clusterSchemaTree) {
+ taskResult = clusterSchemaTree;
+ }
+
+ private ClusterSchemaTree extractResult(PartialPath devicePath, List<String> measurements) {
+ while (true) {
+ if (taskResult != null) {
+ break;
+ }
+ }
+ return taskResult.extractDeviceSubTree(devicePath, measurements);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
index 52eabcdcc6..6a80668a5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -124,9 +124,13 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
List<PartialPath> fullPathList = new ArrayList<>();
+ Map<PartialPath, List<String>> deviceMap = new HashMap<>();
for (PartialPath pattern : pathPatternList) {
if (!pattern.hasWildcard()) {
fullPathList.add(pattern);
+ deviceMap
+ .computeIfAbsent(pattern.getDevicePath(), k -> new ArrayList<>())
+ .add(pattern.getMeasurement());
}
}
@@ -164,9 +168,15 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
}
- schemaTree =
- clusterSchemaFetchExecutor.executeSchemaFetchQuery(
- new SchemaFetchStatement(patternTree, templateMap, false));
+ if (deviceMap.size() == 1) {
+ Map.Entry<PartialPath, List<String>> entry = deviceMap.entrySet().iterator().next();
+ schemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(entry.getKey(), entry.getValue());
+ } else {
+ schemaTree =
+ clusterSchemaFetchExecutor.executeSchemaFetchQuery(
+ new SchemaFetchStatement(patternTree, templateMap, false));
+ }
// only cache the schema fetched by full path
List<MeasurementPath> measurementPathList;
@@ -204,11 +214,12 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
// try fetch the missing schema from remote and cache fetched schema
- PathPatternTree patternTree = new PathPatternTree();
- for (int index : indexOfMissingMeasurements) {
- patternTree.appendFullPath(devicePath, measurements[index]);
- }
- ClusterSchemaTree remoteSchemaTree = fetchSchemaFromRemote(patternTree);
+ ClusterSchemaTree remoteSchemaTree =
+ clusterSchemaFetchExecutor.fetchSchemaOfOneDevice(
+ devicePath,
+ indexOfMissingMeasurements.stream()
+ .map(index -> measurements[index])
+ .collect(Collectors.toList()));
if (!remoteSchemaTree.isEmpty()) {
schemaTree.mergeSchemaTree(remoteSchemaTree);
schemaCache.put(remoteSchemaTree);