You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/06 13:17:14 UTC
[incubator-iotdb] branch kyy3 updated: check path
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy3
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/kyy3 by this push:
new 947247b check path
947247b is described below
commit 947247ba85cf1e3ff8a1b95bd875fcd7b1aa83f1
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 6 21:16:49 2020 +0800
check path
---
.../cluster/server/member/MetaGroupMember.java | 64 ++++++++++++++++++++++
.../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 4 ++
2 files changed, 68 insertions(+)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 6fb2f58..51c036b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -151,6 +152,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -1513,6 +1515,15 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
* @return
*/
private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+ if (plan instanceof DeleteTimeSeriesPlan) {
+ try {
+ plan = getDeleteTimeseriesPlanWithFullPaths((DeleteTimeSeriesPlan) plan);
+ } catch (PathNotExistException e) {
+ TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+ tsStatus.setMessage(e.getMessage());
+ return tsStatus;
+ }
+ }
try {
syncLeaderWithConsistencyCheck();
List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
@@ -1525,6 +1536,22 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
}
}
+ DeleteTimeSeriesPlan getDeleteTimeseriesPlanWithFullPaths(DeleteTimeSeriesPlan plan)
+ throws PathNotExistException {
+ Pair<List<String>, List<String>> getMatchedPathsRet = getMatchedPaths(plan.getPathsStrings());
+ List<String> fullPathsStrings = getMatchedPathsRet.left;
+ List<String> nonExistPathsStrings = getMatchedPathsRet.right;
+ if (!nonExistPathsStrings.isEmpty()) {
+ throw new PathNotExistException(new ArrayList<>(nonExistPathsStrings));
+ }
+ List<Path> fullPaths = new ArrayList<>();
+ for(String pathStr : fullPathsStrings){
+ fullPaths.add(new Path(pathStr));
+ }
+ plan.setPaths(fullPaths);
+ return plan;
+ }
+
/**
* A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
* data group. And these sub-plans will be sent to and executed on the corresponding groups
@@ -2615,6 +2642,43 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
}
/**
+ * Get all paths after removing wildcards in the path
+ *
+ * @param originalPaths, a list of paths, potentially with wildcard
+ * @return a pair of path lists, the first are the existing full paths, the second are invalid
+ * original paths
+ */
+ public Pair<List<String>, List<String>> getMatchedPaths(List<String> originalPaths) {
+ ConcurrentSkipListSet<String> fullPaths = new ConcurrentSkipListSet<>();
+ ConcurrentSkipListSet<String> nonExistPaths = new ConcurrentSkipListSet<>();
+ ExecutorService getAllPathsService = Executors
+ .newFixedThreadPool(partitionTable.getGlobalGroups().size());
+ for (String pathStr : originalPaths) {
+ getAllPathsService.submit(() -> {
+ try {
+ List<String> fullPathStrs = getMatchedPaths(pathStr);
+ if (fullPathStrs.isEmpty()) {
+ nonExistPaths.add(pathStr);
+ logger.error("Path {} is not found.", pathStr);
+ }
+ fullPaths.addAll(fullPathStrs);
+ } catch (MetadataException e) {
+ logger.error("Failed to get full paths of the prefix path: {} because", pathStr, e);
+ }
+ });
+ }
+ getAllPathsService.shutdown();
+ try {
+ getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
+ }
+ return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths));
+ }
+
+
+ /**
* Get all devices after removing wildcards in the path
*
* @param originPath a path potentially with wildcard
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 16b647a..97af723 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -82,4 +82,8 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
deletePathList.add(new Path(readString(buffer)));
}
}
+
+ public void setPaths(List<Path> fullPaths) {
+ this.deletePathList = fullPaths;
+ }
}