You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/06 13:17:14 UTC

[incubator-iotdb] branch kyy3 updated: check path

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy3
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/kyy3 by this push:
     new 947247b  check path
947247b is described below

commit 947247ba85cf1e3ff8a1b95bd875fcd7b1aa83f1
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Mon Jul 6 21:16:49 2020 +0800

    check path
---
 .../cluster/server/member/MetaGroupMember.java     | 64 ++++++++++++++++++++++
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |  4 ++
 2 files changed, 68 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 6fb2f58..51c036b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -151,6 +152,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -1513,6 +1515,15 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
    * @return
    */
   private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+    if (plan instanceof DeleteTimeSeriesPlan) {
+      try {
+        plan = getDeleteTimeseriesPlanWithFullPaths((DeleteTimeSeriesPlan) plan);
+      } catch (PathNotExistException e) {
+        TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
+        tsStatus.setMessage(e.getMessage());
+        return tsStatus;
+      }
+    }
     try {
       syncLeaderWithConsistencyCheck();
       List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
@@ -1525,6 +1536,22 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
     }
   }
 
+  DeleteTimeSeriesPlan getDeleteTimeseriesPlanWithFullPaths(DeleteTimeSeriesPlan plan)
+      throws PathNotExistException {
+    Pair<List<String>, List<String>> getMatchedPathsRet = getMatchedPaths(plan.getPathsStrings());
+    List<String> fullPathsStrings =  getMatchedPathsRet.left;
+    List<String> nonExistPathsStrings = getMatchedPathsRet.right;
+    if (!nonExistPathsStrings.isEmpty()) {
+      throw new PathNotExistException(new ArrayList<>(nonExistPathsStrings));
+    }
+    List<Path> fullPaths = new ArrayList<>();
+    for(String pathStr : fullPathsStrings){
+      fullPaths.add(new Path(pathStr));
+    }
+    plan.setPaths(fullPaths);
+    return plan;
+  }
+
   /**
    * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
    * data group. And these sub-plans will be sent to and executed on the corresponding groups
@@ -2615,6 +2642,43 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
   }
 
   /**
+   * Get all paths after removing wildcards in the path
+   *
+   * @param originalPaths, a list of paths, potentially with wildcard
+   * @return a pair of path lists, the first are the existing full paths, the second are invalid
+   * original paths
+   */
+  public Pair<List<String>, List<String>> getMatchedPaths(List<String> originalPaths) {
+    ConcurrentSkipListSet<String> fullPaths = new ConcurrentSkipListSet<>();
+    ConcurrentSkipListSet<String> nonExistPaths = new ConcurrentSkipListSet<>();
+    ExecutorService getAllPathsService = Executors
+        .newFixedThreadPool(partitionTable.getGlobalGroups().size());
+    for (String pathStr : originalPaths) {
+      getAllPathsService.submit(() -> {
+        try {
+          List<String> fullPathStrs = getMatchedPaths(pathStr);
+          if (fullPathStrs.isEmpty()) {
+            nonExistPaths.add(pathStr);
+            logger.error("Path {} is not found.", pathStr);
+          }
+          fullPaths.addAll(fullPathStrs);
+        } catch (MetadataException e) {
+          logger.error("Failed to get full paths of the prefix path: {} because", pathStr, e);
+        }
+      });
+    }
+    getAllPathsService.shutdown();
+    try {
+      getAllPathsService.awaitTermination(RaftServer.getQueryTimeoutInSec(), TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.error("Unexpected interruption when waiting for get all paths services to stop", e);
+    }
+    return new Pair<>(new ArrayList<>(fullPaths), new ArrayList<>(nonExistPaths));
+  }
+
+
+  /**
    * Get all devices after removing wildcards in the path
    *
    * @param originPath a path potentially with wildcard
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
index 16b647a..97af723 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DeleteTimeSeriesPlan.java
@@ -82,4 +82,8 @@ public class DeleteTimeSeriesPlan extends PhysicalPlan {
       deletePathList.add(new Path(readString(buffer)));
     }
   }
+
+  public void setPaths(List<Path> fullPaths) {
+    this.deletePathList = fullPaths;
+  }
 }