You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/06 12:03:07 UTC
[incubator-iotdb] branch kyy2 updated: check path before delete
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/kyy2 by this push:
new 5c12808 check path before delete
5c12808 is described below
commit 5c128087e2d9ceee43bfed4bbb3e92145e55b77a
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 6 20:02:34 2020 +0800
check path before delete
---
.../cluster/server/member/MetaGroupMember.java | 70 +++++++++++++++++++++-
.../iotdb/cluster/server/member/RaftMember.java | 2 +-
.../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 4 ++
3 files changed, 72 insertions(+), 4 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index cb97fb4..2fba057 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -151,6 +152,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -1402,7 +1404,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
applySnapshotUsers(authorizer, snapshot);
applySnapshotRoles(authorizer, snapshot);
} catch (AuthException e) {
- logger.error("{}: Cannot get authorizer instance, error is: {}", name, e);
+ logger.error("{}: Cannot get authorizer instance, error is: ", name, e);
}
// 5. accept partition table
@@ -1513,6 +1515,15 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
* @return
*/
private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+ if (plan instanceof DeleteTimeSeriesPlan) {
+ try {
+ plan = getDeleteTimeseriesPlanWithFullPaths((DeleteTimeSeriesPlan) plan);
+ } catch (PathNotExistException e) {
+ TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+ tsStatus.setMessage(e.getMessage());
+ return tsStatus;
+ }
+ }
try {
syncLeaderWithConsistencyCheck();
List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
@@ -1525,6 +1536,22 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
}
}
+ DeleteTimeSeriesPlan getDeleteTimeseriesPlanWithFullPaths(DeleteTimeSeriesPlan plan)
+ throws PathNotExistException {
+ Pair<List<String>, List<String>> getMatchedPathsRet = getMatchedPaths(plan.getPathsStrings());
+ List<String> fullPathsStrings = getMatchedPathsRet.left;
+ List<String> nonExistPathsStrings = getMatchedPathsRet.right;
+ if (!nonExistPathsStrings.isEmpty()) {
+ throw new PathNotExistException(new ArrayList<>(nonExistPathsStrings));
+ }
+ List<Path> fullPaths = new ArrayList<>();
+ for (String pathStr : fullPathsStrings) {
+ fullPaths.add(new Path(pathStr));
+ }
+ plan.setPaths(fullPaths);
+ return plan;
+ }
+
/**
* A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
* data group. And these sub-plans will be sent to and executed on the corresponding groups
@@ -1611,7 +1638,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
setStorageGroupResult.getCode(), storageGroupName)
);
}
- if(plan instanceof InsertRowPlan){
+ if (plan instanceof InsertRowPlan) {
// try to create timeseries
boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan);
if (!isAutoCreateTimeseriesSuccess) {
@@ -2003,7 +2030,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
if (logger.isDebugEnabled()) {
logger.debug("{}: Pulled {} timeseries schemas of {} and other {} paths from {} of {}",
name,
- schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1, node,
+ schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1,
+ node,
request.getHeader());
}
results.addAll(schemas);
@@ -2615,6 +2643,42 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
}
/**
+ * Get all paths after removing wildcards in the path
+ *
+ * @param originalPaths, a list of paths, potentially with wildcard
+ * @return a pair of path lists, the first are the existing full paths, the second are invalid
+ * original paths
+ */
+ public Pair<List<String>, List<String>> getMatchedPaths(List<String> originalPaths) {
+ ConcurrentSkipListSet<String> fullPaths = new ConcurrentSkipListSet<>();
+ ConcurrentSkipListSet<String> nonExistPaths = new ConcurrentSkipListSet<>();
+ ExecutorService getAllPathsService = Executors
+ .newFixedThreadPool(partitionTable.getGlobalGroups().size());
+ for (String pathStr : originalPaths) {
+ getAllPathsService.submit(() -> {
+ try {
+ List<String> fullPathStrs = getMatchedPaths(pathStr);
+ if (fullPathStrs.isEmpty()) {
+ nonExistPaths.add(pathStr);
+ logger.error("Path {} is not found.", pathStr);
+ }
+ fullPaths.addAll(fullPathStrs);
+ } catch (MetadataException e) {
+ logger.error("Failed to get full paths of the prefix path: {} because", pathStr, e);
+ }
+ });
+ }
+ getAllPathsService.shutdown();
+ try {
+ getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
+ }
+ return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths));
+ }
+
+ /**
* Get all devices after removing wildcards in the path
*
* @param originPath a path potentially with wildcard
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d775e04..71b2486 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1027,7 +1027,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
return StatusUtils.TIME_OUT;
}
- private Throwable getRootCause(Throwable e) {
+ Throwable getRootCause(Throwable e) {
Throwable curr = e;
while (curr.getCause() != null) {
curr = curr.getCause();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 16b647a..97af723 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -82,4 +82,8 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
deletePathList.add(new Path(readString(buffer)));
}
}
+
+ public void setPaths(List<Path> fullPaths) {
+ this.deletePathList = fullPaths;
+ }
}