You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/17 07:47:18 UTC
[iotdb] 20/45: [RcoksDB] refine delete timeseries logic
This is an automated email from the ASF dual-hosted git repository.
jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c1d4303705ea01759f6e66e2a404b45ac94c2145
Author: chengjianyun <ch...@360.cn>
AuthorDate: Wed Mar 9 14:25:30 2022 +0800
[RcoksDB] refine delete timeseries logic
---
.../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 159 ++++++++++-----------
.../iotdb/db/metadata/rocksdb/RocksDBUtils.java | 31 +---
2 files changed, 82 insertions(+), 108 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index 7ae7e50..3c8523f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.metadata.rocksdb;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
@@ -122,17 +121,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
/**
@@ -641,89 +630,93 @@ public class MRocksDBManager implements IMetaManager {
@Override
public String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
- List<MeasurementPath> allTimeseries = getMeasurementPaths(pathPattern, isPrefixMatch);
- if (allTimeseries.isEmpty()) {
- // In the cluster mode, the deletion of a timeseries will be forwarded to all the nodes. For
- // nodes that do not have the metadata of the timeseries, the coordinator expects a
- // PathNotExistException.
- throw new PathNotExistException(pathPattern.getFullPath());
- }
-
Set<String> failedNames = new HashSet<>();
- for (PartialPath p : allTimeseries) {
- try {
- String[] nodes = p.getNodes();
- if (nodes.length == 0 || !IoTDBConstant.PATH_ROOT.equals(nodes[0])) {
- throw new IllegalPathException(p.getFullPath());
- }
-
- // Delete measurement node
- String mLevelPath = RocksDBUtils.getLevelPath(p.getNodes(), p.getNodeLength() - 1);
- Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
- RMeasurementMNode deletedNode;
- if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ traverseOutcomeBasins(
+ pathPattern.getNodes(),
+ MAX_PATH_DEPTH,
+ (key, value) -> {
+ String path = null;
+ RMeasurementMNode deletedNode = null;
try {
- deletedNode = (RMeasurementMNode) getMeasurementMNode(p);
- WriteBatch batch = new WriteBatch();
- // delete the last node of path
- byte[] mNodeKey = RocksDBUtils.toMeasurementNodeKey(mLevelPath);
- batch.delete(mNodeKey);
- if (deletedNode.getAlias() != null) {
- batch.delete(RocksDBUtils.toAliasNodeKey(mLevelPath));
- }
- if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
- batch.delete(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), mNodeKey);
- // TODO: tags invert index update
+ path = RocksDBUtils.getPathByInnerName(key);
+ PartialPath partialPath = new PartialPath(path);
+ String levelPath =
+ RocksDBUtils.getLevelPath(partialPath.getNodes(), partialPath.getNodeLength() - 1);
+ // Delete measurement node
+ Lock lock = locksPool.get(levelPath);
+ Lock lock = locksPool.computeIfAbsent(mLevelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ deletedNode = new RMeasurementMNode(path, value);
+ WriteBatch batch = new WriteBatch();
+ // delete the last node of path
+ batch.delete(key);
+ if (deletedNode.getAlias() != null) {
+ String[] aliasNodes =
+ Arrays.copyOf(partialPath.getNodes(), partialPath.getNodeLength());
+ aliasNodes[partialPath.getNodeLength() - 1] = deletedNode.getAlias();
+ String aliasLevelPath =
+ RocksDBUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
+ batch.delete(RocksDBUtils.toAliasNodeKey(aliasLevelPath));
+ }
+ if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
+ batch.delete(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), key);
+ // TODO: tags invert index update
+ }
+ readWriteHandler.executeBatch(batch);
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ throw new AcquireLockTimeoutException("acquire lock timeout, " + path);
}
- readWriteHandler.executeBatch(batch);
- } finally {
- lock.unlock();
+ } catch (Exception e) {
+ logger.error("delete timeseries [{}] fail", path, e);
+ failedNames.add(path);
}
- } else {
- throw new AcquireLockTimeoutException("acquire lock timeout, " + p.getFullPath());
- }
- // delete parent node if is empty
- IMNode curNode = deletedNode;
- while (curNode != null) {
- PartialPath curPath = curNode.getPartialPath();
- String curLevelPath =
- RocksDBUtils.getLevelPath(curPath.getNodes(), curPath.getNodeLength() - 1);
+ // delete parent node if is empty
+ IMNode parentNode = deletedNode.getParent();
Lock curLock = locksPool.computeIfAbsent(curLevelPath, x -> new ReentrantLock());
- if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
- try {
- IMNode toDelete = curNode.getParent();
- if (toDelete == null || toDelete.isStorageGroup()) {
+ try {
+ while (parentNode != null) {
+ // TODO: check children size
+ if (!parentNode.isEmptyInternal() || parentNode.isStorageGroup()) {
break;
- }
-
- if (toDelete.isEmptyInternal()) {
- if (toDelete.isEntity()) {
- // TODO: aligned timeseries needs special check????
- readWriteHandler.deleteNode(
- toDelete.getPartialPath().getNodes(), RocksDBMNodeType.ENTITY);
+ } else {
+ PartialPath parentPath = parentNode.getPartialPath();
+ String parentLevelPath =
+ RocksDBUtils.getLevelPath(
+ parentPath.getNodes(), parentPath.getNodeLength() - 1);
+ Lock curLock = locksPool.get(parentLevelPath);
+ if (curLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (parentNode.isEntity()) {
+ // TODO: aligned timeseries needs special check????
+ readWriteHandler.deleteNode(
+ parentNode.getPartialPath().getNodes(), RocksDBMNodeType.ENTITY);
+ } else {
+ readWriteHandler.deleteNode(
+ parentNode.getPartialPath().getNodes(), RocksDBMNodeType.INTERNAL);
+ }
+ parentNode = parentNode.getParent();
+ } finally {
+ curLock.unlock();
+ }
} else {
- readWriteHandler.deleteNode(
- toDelete.getPartialPath().getNodes(), RocksDBMNodeType.INTERNAL);
+ throw new AcquireLockTimeoutException("acquire lock timeout, " + parentLevelPath);
}
- curNode = toDelete;
- } else {
- break;
}
- } finally {
- curLock.unlock();
}
- } else {
- throw new AcquireLockTimeoutException("acquire lock timeout, " + curNode.getFullPath());
+ // TODO: trigger engine update
+ // TODO: update totalTimeSeriesNumber
+ } catch (Exception e) {
+ logger.error("delete timeseries [{}] fail", parentNode.getFullPath(), e);
+ failedNames.add(parentNode.getFullPath());
}
- }
- // TODO: trigger engine update
- // TODO: update totalTimeSeriesNumber
- } catch (Exception e) {
- logger.error("delete timeseries [{}] fail", p.getFullPath(), e);
- failedNames.add(p.getFullPath());
- }
- }
+ return true;
+ },
+ new Character[] {NODE_TYPE_MEASUREMENT});
return failedNames.isEmpty() ? null : String.join(",", failedNames);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
index 8ac3467..a1d37cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBUtils.java
@@ -43,33 +43,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_ROOT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ATTRIBUTES;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TAGS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_VERSION;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_FLAG;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ESCAPE_PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_ATTRIBUTES;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_HAS_TAGS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;
import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ALIAS;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_INTERNAL;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_CHAR;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT_STRING;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
public class RocksDBUtils {
@@ -198,6 +175,10 @@ public class RocksDBUtils {
flag = (byte) (flag | FLAG_HAS_ATTRIBUTES);
}
+ if (schema != null) {
+ flag = (byte) (flag | FLAG_IS_SCHEMA);
+ }
+
ReadWriteIOUtils.write(flag, outputStream);
if (schema != null) {