You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/17 07:47:18 UTC

[iotdb] 20/45: [RcoksDB] refine delete timeseries logic

This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c1d4303705ea01759f6e66e2a404b45ac94c2145
Author: chengjianyun <ch...@360.cn>
AuthorDate: Wed Mar 9 14:25:30 2022 +0800

    [RcoksDB] refine delete timeseries logic
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 159 ++++++++++-----------
 .../iotdb/db/metadata/rocksdb/RocksDBUtils.java    |  31 +---
 2 files changed, 82 insertions(+), 108 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 7ae7e50..3c8523f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.metadata.rocksdb;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
 import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
@@ -122,17 +121,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -641,89 +630,93 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    List<MeasurementPath> allTimeseries = getMeasurementPaths(pathPattern, isPrefixMatch);
-    if (allTimeseries.isEmpty()) {
-      // In the cluster mode, the deletion of a timeseries will be forwarded to all the nodes. For
-      // nodes that do not have the metadata of the timeseries, the coordinator expects a
-      // PathNotExistException.
-      throw new PathNotExistException(pathPattern.getFullPath());
-    }
-
     Set<String> failedNames = new HashSet<>();
-    for (PartialPath p : allTimeseries) {
-      try {
-        String[] nodes = p.getNodes();
-        if (nodes.length == 0 || !IoTDBConstant.PATH_ROOT.equals(nodes[0])) {
-          throw new IllegalPathException(p.getFullPath());
-        }
-
-        // Delete measurement node
-        String mLevelPath = RocksDBUtils.getLevelPath(p.getNodes(), p.getNodeLength() - 1);
-        Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
-        RMeasurementMNode deletedNode;
-        if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+    traverseOutcomeBasins(
+        pathPattern.getNodes(),
+        MAX_PATH_DEPTH,
+        (key, value) -> {
+          String path = null;
+          RMeasurementMNode deletedNode = null;
           try {
-            deletedNode = (RMeasurementMNode) getMeasurementMNode(p);
-            WriteBatch batch = new WriteBatch();
-            // delete the last node of path
-            byte[] mNodeKey = RocksDBUtils.toMeasurementNodeKey(mLevelPath);
-            batch.delete(mNodeKey);
-            if (deletedNode.getAlias() != null) {
-              batch.delete(RocksDBUtils.toAliasNodeKey(mLevelPath));
-            }
-            if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
-              batch.delete(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), mNodeKey);
-              // TODO: tags invert index update
+            path = RocksDBUtils.getPathByInnerName(key);
+            PartialPath partialPath = new PartialPath(path);
+            String levelPath =
+                RocksDBUtils.getLevelPath(partialPath.getNodes(), partialPath.getNodeLength() - 1);
+            // Delete measurement node
+            Lock lock = locksPool.get(levelPath);
+        Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
+            if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+              try {
+                deletedNode = new RMeasurementMNode(path, value);
+                WriteBatch batch = new WriteBatch();
+                // delete the last node of path
+                batch.delete(key);
+                if (deletedNode.getAlias() != null) {
+                  String[] aliasNodes =
+                      Arrays.copyOf(partialPath.getNodes(), partialPath.getNodeLength());
+                  aliasNodes[partialPath.getNodeLength() - 1] = deletedNode.getAlias();
+                  String aliasLevelPath =
+                      RocksDBUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
+                  batch.delete(RocksDBUtils.toAliasNodeKey(aliasLevelPath));
+                }
+                if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
+                  batch.delete(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), key);
+                  // TODO: tags invert index update
+                }
+                readWriteHandler.executeBatch(batch);
+              } finally {
+                lock.unlock();
+              }
+            } else {
+              throw new AcquireLockTimeoutException("acquire lock timeout, " + path);
             }
-            readWriteHandler.executeBatch(batch);
-          } finally {
-            lock.unlock();
+          } catch (Exception e) {
+            logger.error("delete timeseries [{}] fail", path, e);
+            failedNames.add(path);
           }
-        } else {
-          throw new AcquireLockTimeoutException("acquire lock timeout, " + p.getFullPath());
-        }
 
-        // delete parent node if is empty
-        IMNode curNode = deletedNode;
-        while (curNode != null) {
-          PartialPath curPath = curNode.getPartialPath();
-          String curLevelPath =
-              RocksDBUtils.getLevelPath(curPath.getNodes(), curPath.getNodeLength() - 1);
+          // delete parent node if is empty
+          IMNode parentNode = deletedNode.getParent();
           Lock curLock = locksPool.computeIfAbsent(curLevelPath, x -> new ReentrantLock());
-          if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
-            try {
-              IMNode toDelete = curNode.getParent();
-              if (toDelete == null || toDelete.isStorageGroup()) {
+          try {
+            while (parentNode != null) {
+              // TODO: check children size
+              if (!parentNode.isEmptyInternal() || parentNode.isStorageGroup()) {
                 break;
-              }
-
-              if (toDelete.isEmptyInternal()) {
-                if (toDelete.isEntity()) {
-                  // TODO: aligned timeseries needs special check????
-                  readWriteHandler.deleteNode(
-                      toDelete.getPartialPath().getNodes(), RocksDBMNodeType.ENTITY);
+              } else {
+                PartialPath parentPath = parentNode.getPartialPath();
+                String parentLevelPath =
+                    RocksDBUtils.getLevelPath(
+                        parentPath.getNodes(), parentPath.getNodeLength() - 1);
+                Lock curLock = locksPool.get(parentLevelPath);
+                if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+                  try {
+                    if (parentNode.isEntity()) {
+                      // TODO: aligned timeseries needs special check????
+                      readWriteHandler.deleteNode(
+                          parentNode.getPartialPath().getNodes(), RocksDBMNodeType.ENTITY);
+                    } else {
+                      readWriteHandler.deleteNode(
+                          parentNode.getPartialPath().getNodes(), RocksDBMNodeType.INTERNAL);
+                    }
+                    parentNode = parentNode.getParent();
+                  } finally {
+                    curLock.unlock();
+                  }
                 } else {
-                  readWriteHandler.deleteNode(
-                      toDelete.getPartialPath().getNodes(), RocksDBMNodeType.INTERNAL);
+                  throw new AcquireLockTimeoutException("acquire lock timeout, " + parentLevelPath);
                 }
-                curNode = toDelete;
-              } else {
-                break;
               }
-            } finally {
-              curLock.unlock();
             }
-          } else {
-            throw new AcquireLockTimeoutException("acquire lock timeout, " + curNode.getFullPath());
+            // TODO: trigger engine update
+            // TODO: update totalTimeSeriesNumber
+          } catch (Exception e) {
+            logger.error("delete timeseries [{}] fail", parentNode.getFullPath(), e);
+            failedNames.add(parentNode.getFullPath());
           }
-        }
-        // TODO: trigger engine update
-        // TODO: update totalTimeSeriesNumber
-      } catch (Exception e) {
-        logger.error("delete timeseries [{}] fail", p.getFullPath(), e);
-        failedNames.add(p.getFullPath());
-      }
-    }
+          return true;
+        },
+        new Character[] {NODE_TYPE_MEASUREMENT});
     return failedNames.isEmpty() ? null : String.join(",", failedNames);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 8ac3467..a1d37cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -43,33 +43,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_ROOT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ATTRIBUTES;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_VERSION;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_FLAG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ESCAPE_PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ATTRIBUTES;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_TAGS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_INTERNAL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
 import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_CHAR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_STRING;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
 
 public class RocksDBUtils {
 
@@ -198,6 +175,10 @@ public class RocksDBUtils {
       flag = (byte) (flag | FLAG_HAS_ATTRIBUTES);
     }
 
+    if (schema != null) {
+      flag = (byte) (flag | FLAG_IS_SCHEMA);
+    }
+
     ReadWriteIOUtils.write(flag, outputStream);
 
     if (schema != null) {