You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/17 07:47:09 UTC
[iotdb] 11/45: update the method of getting the schema
This is an automated email from the ASF dual-hosted git repository.
jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aeb546b00e3e0e9badc52c83eb9d778bd16221ae
Author: lisijia <li...@360.cn>
AuthorDate: Tue Mar 8 15:52:58 2022 +0800
update the method of getting the schema
---
.../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 93 +++++++++++++++-------
1 file changed, 63 insertions(+), 30 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index ff41b13..a705ae6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -113,13 +113,23 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
-import java.util.function.Consumer;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
/**
@@ -946,14 +956,17 @@ public class MRocksDBManager implements IMetaManager {
}
public void traverseOutcomeBasins(
- String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray)
+ String[] nodes,
+ int maxLevel,
+ BiFunction<String, byte[], Boolean> function,
+ Character[] nodeTypeArray)
throws IllegalPathException {
List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
- allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, consumer, nodeTypeArray));
+ allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
}
public void traverseByPatternPath(
- String[] nodes, Consumer<String> consumer, Character[] nodeTypeArray) {
+ String[] nodes, BiFunction<String, byte[], Boolean> function, Character[] nodeTypeArray) {
// String[] nodes = pathPattern.getNodes();
int startIndex = 0;
@@ -968,12 +981,14 @@ public class MRocksDBManager implements IMetaManager {
String levelPrefix =
RocksDBUtils.convertPartialPathToInnerByNodes(nodes, nodes.length - 1, x);
try {
- if (readWriteHandler.keyExist(levelPrefix.getBytes())) {
+ Holder<byte[]> holder = new Holder<>();
+ readWriteHandler.keyExist(levelPrefix.getBytes(), holder);
+ if (holder.getValue() != null) {
StringBuilder stringBuilder = new StringBuilder();
for (String node : nodes) {
stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(node);
}
- consumer.accept(stringBuilder.substring(1));
+ function.apply(stringBuilder.substring(1), holder.getValue());
}
} catch (RocksDBException e) {
logger.error(e.getMessage());
@@ -1018,8 +1033,9 @@ public class MRocksDBManager implements IMetaManager {
}
if (RocksDBUtils.suffixMatch(iterator.key(), suffixToMatch)) {
if (lastIteration) {
- consumer.accept(
- RocksDBUtils.getPathByInnerName(new String(iterator.key())));
+ function.apply(
+ RocksDBUtils.getPathByInnerName(new String(iterator.key())),
+ iterator.value());
} else {
tempNodes.add(RocksDBUtils.toMetaNodes(iterator.key()));
}
@@ -1152,9 +1168,12 @@ public class MRocksDBManager implements IMetaManager {
private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
AtomicInteger atomicInteger = new AtomicInteger(0);
- Consumer<String> consumer = s -> atomicInteger.incrementAndGet();
-
- traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+ BiFunction<String, byte[], Boolean> function =
+ (a, b) -> {
+ atomicInteger.incrementAndGet();
+ return true;
+ };
+ traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
return atomicInteger.get();
}
@@ -1408,9 +1427,12 @@ public class MRocksDBManager implements IMetaManager {
String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
throws IllegalPathException {
List<String> allResult = Collections.synchronizedList(new ArrayList<>());
- Consumer<String> consumer = allResult::add;
-
- traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+ BiFunction<String, byte[], Boolean> function =
+ (a, b) -> {
+ allResult.add(a);
+ return true;
+ };
+ traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
for (String path : allResult) {
collection.add(new PartialPath(path));
@@ -1525,20 +1547,7 @@ public class MRocksDBManager implements IMetaManager {
@Override
public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
- List<MeasurementPath> result = new ArrayList<>();
- Map<String, byte[]> allMeasurement =
- getKeyNumByPrefix(pathPattern, NODE_TYPE_MEASUREMENT, isPrefixMatch);
- for (Entry<String, byte[]> entry : allMeasurement.entrySet()) {
- try {
- MeasurementSchema schema =
- (MeasurementSchema) RocksDBUtils.parseNodeValue(entry.getValue(), FLAG_IS_SCHEMA);
- PartialPath path = new PartialPath(RocksDBUtils.getPathByInnerName(entry.getKey()));
- result.add(new MeasurementPath(path, schema));
- } catch (ClassCastException e) {
- throw new MetadataException(e);
- }
- }
- return result;
+ return getMatchedMeasurementPath(pathPattern.getNodes());
}
/**
@@ -1565,7 +1574,31 @@ public class MRocksDBManager implements IMetaManager {
public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
throws MetadataException {
- return null;
+ // todo update offset
+ Pair<List<MeasurementPath>, Integer> result =
+ new Pair<>(getMatchedMeasurementPath(pathPattern.getNodes()), offset + limit);
+ return result;
+ }
+
+ private List<MeasurementPath> getMatchedMeasurementPath(String[] nodes)
+ throws IllegalPathException {
+ List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
+ BiFunction<String, byte[], Boolean> function =
+ (a, b) -> {
+ try {
+ allResult.add(
+ new MeasurementPath(
+ new PartialPath(a),
+ (MeasurementSchema) RocksDBUtils.parseNodeValue(b, FLAG_IS_SCHEMA)));
+ return true;
+ } catch (IllegalPathException e) {
+ logger.error(e.getMessage());
+ return false;
+ }
+ };
+ traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+ return allResult;
}
@Override