You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/06 12:03:07 UTC

[incubator-iotdb] branch kyy2 updated: check path before delete

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/kyy2 by this push:
     new 5c12808  check path before delete
5c12808 is described below

commit 5c128087e2d9ceee43bfed4bbb3e92145e55b77a
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 6 20:02:34 2020 +0800

    check path before delete
---
 .../cluster/server/member/MetaGroupMember.java     | 70 +++++++++++++++++++++-
 .../iotdb/cluster/server/member/RaftMember.java    |  2 +-
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |  4 ++
 3 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index cb97fb4..2fba057 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -151,6 +152,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -1402,7 +1404,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
         applySnapshotUsers(authorizer, snapshot);
         applySnapshotRoles(authorizer, snapshot);
       } catch (AuthException e) {
-        logger.error("{}: Cannot get authorizer instance, error is: {}", name, e);
+        logger.error("{}: Cannot get authorizer instance, error is: ", name, e);
       }
 
       // 5. accept partition table
@@ -1513,6 +1515,15 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
    * @return
    */
   private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+    if (plan instanceof DeleteTimeSeriesPlan) {
+      try {
+        plan = getDeleteTimeseriesPlanWithFullPaths((DeleteTimeSeriesPlan) plan);
+      } catch (PathNotExistException e) {
+        TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+        tsStatus.setMessage(e.getMessage());
+        return tsStatus;
+      }
+    }
     try {
       syncLeaderWithConsistencyCheck();
       List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
@@ -1525,6 +1536,22 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
     }
   }
 
+  DeleteTimeSeriesPlan getDeleteTimeseriesPlanWithFullPaths(DeleteTimeSeriesPlan plan)
+      throws PathNotExistException {
+    Pair<List<String>, List<String>> getMatchedPathsRet = getMatchedPaths(plan.getPathsStrings());
+    List<String> fullPathsStrings = getMatchedPathsRet.left;
+    List<String> nonExistPathsStrings = getMatchedPathsRet.right;
+    if (!nonExistPathsStrings.isEmpty()) {
+      throw new PathNotExistException(new ArrayList<>(nonExistPathsStrings));
+    }
+    List<Path> fullPaths = new ArrayList<>();
+    for (String pathStr : fullPathsStrings) {
+      fullPaths.add(new Path(pathStr));
+    }
+    plan.setPaths(fullPaths);
+    return plan;
+  }
+
   /**
    * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
    * data group. And these sub-plans will be sent to and executed on the corresponding groups
@@ -1611,7 +1638,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
               setStorageGroupResult.getCode(), storageGroupName)
       );
     }
-    if(plan instanceof InsertRowPlan){
+    if (plan instanceof InsertRowPlan) {
       // try to create timeseries
       boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan);
       if (!isAutoCreateTimeseriesSuccess) {
@@ -2003,7 +2030,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Pulled {} timeseries schemas of {} and other {} paths from {} of {}",
             name,
-            schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1, node,
+            schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1,
+            node,
             request.getHeader());
       }
       results.addAll(schemas);
@@ -2615,6 +2643,42 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
   }
 
   /**
+   * Get all paths after removing wildcards in the path
+   *
+   * @param originalPaths, a list of paths, potentially with wildcard
+   * @return a pair of path lists, the first are the existing full paths, the second are invalid
+   * original paths
+   */
+  public Pair<List<String>, List<String>> getMatchedPaths(List<String> originalPaths) {
+    ConcurrentSkipListSet<String> fullPaths = new ConcurrentSkipListSet<>();
+    ConcurrentSkipListSet<String> nonExistPaths = new ConcurrentSkipListSet<>();
+    ExecutorService getAllPathsService = Executors
+        .newFixedThreadPool(partitionTable.getGlobalGroups().size());
+    for (String pathStr : originalPaths) {
+      getAllPathsService.submit(() -> {
+        try {
+          List<String> fullPathStrs = getMatchedPaths(pathStr);
+          if (fullPathStrs.isEmpty()) {
+            nonExistPaths.add(pathStr);
+            logger.error("Path {} is not found.", pathStr);
+          }
+          fullPaths.addAll(fullPathStrs);
+        } catch (MetadataException e) {
+          logger.error("Failed to get full paths of the prefix path: {} because", pathStr, e);
+        }
+      });
+    }
+    getAllPathsService.shutdown();
+    try {
+      getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
+    }
+    return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths));
+  }
+
+  /**
    * Get all devices after removing wildcards in the path
    *
    * @param originPath a path potentially with wildcard
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d775e04..71b2486 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1027,7 +1027,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
     return StatusUtils.TIME_OUT;
   }
 
-  private Throwable getRootCause(Throwable e) {
+  Throwable getRootCause(Throwable e) {
     Throwable curr = e;
     while (curr.getCause() != null) {
       curr = curr.getCause();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 16b647a..97af723 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -82,4 +82,8 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
       deletePathList.add(new Path(readString(buffer)));
     }
   }
+
+  public void setPaths(List<Path> fullPaths) {
+    this.deletePathList = fullPaths;
+  }
 }