You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/17 07:47:09 UTC

[iotdb] 11/45: update the method of getting the schema

This is an automated email from the ASF dual-hosted git repository.

jianyun pushed a commit to branch rocksdb/dev
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aeb546b00e3e0e9badc52c83eb9d778bd16221ae
Author: lisijia <li...@360.cn>
AuthorDate: Tue Mar 8 15:52:58 2022 +0800

    update the method of getting the schema
---
 .../iotdb/db/metadata/rocksdb/MRocksDBManager.java | 93 +++++++++++++++-------
 1 file changed, 63 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
index ff41b13..a705ae6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
@@ -113,13 +113,23 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
-import java.util.function.Consumer;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
-import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.*;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_SCHEMA;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 /**
@@ -946,14 +956,17 @@ public class MRocksDBManager implements IMetaManager {
   }
 
   public void traverseOutcomeBasins(
-      String[] nodes, int maxLevel, Consumer<String> consumer, Character[] nodeTypeArray)
+      String[] nodes,
+      int maxLevel,
+      BiFunction<String, byte[], Boolean> function,
+      Character[] nodeTypeArray)
       throws IllegalPathException {
     List<String[]> allNodesArray = RocksDBUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
-    allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, consumer, nodeTypeArray));
+    allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
   }
 
   public void traverseByPatternPath(
-      String[] nodes, Consumer<String> consumer, Character[] nodeTypeArray) {
+      String[] nodes, BiFunction<String, byte[], Boolean> function, Character[] nodeTypeArray) {
     //    String[] nodes = pathPattern.getNodes();
 
     int startIndex = 0;
@@ -968,12 +981,14 @@ public class MRocksDBManager implements IMetaManager {
                 String levelPrefix =
                     RocksDBUtils.convertPartialPathToInnerByNodes(nodes, nodes.length - 1, x);
                 try {
-                  if (readWriteHandler.keyExist(levelPrefix.getBytes())) {
+                  Holder<byte[]> holder = new Holder<>();
+                  readWriteHandler.keyExist(levelPrefix.getBytes(), holder);
+                  if (holder.getValue() != null) {
                     StringBuilder stringBuilder = new StringBuilder();
                     for (String node : nodes) {
                       stringBuilder.append(RockDBConstants.PATH_SEPARATOR).append(node);
                     }
-                    consumer.accept(stringBuilder.substring(1));
+                    function.apply(stringBuilder.substring(1), holder.getValue());
                   }
                 } catch (RocksDBException e) {
                   logger.error(e.getMessage());
@@ -1018,8 +1033,9 @@ public class MRocksDBManager implements IMetaManager {
                             }
                             if (RocksDBUtils.suffixMatch(iterator.key(), suffixToMatch)) {
                               if (lastIteration) {
-                                consumer.accept(
-                                    RocksDBUtils.getPathByInnerName(new String(iterator.key())));
+                                function.apply(
+                                    RocksDBUtils.getPathByInnerName(new String(iterator.key())),
+                                    iterator.value());
                               } else {
                                 tempNodes.add(RocksDBUtils.toMetaNodes(iterator.key()));
                               }
@@ -1152,9 +1168,12 @@ public class MRocksDBManager implements IMetaManager {
 
   private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
     AtomicInteger atomicInteger = new AtomicInteger(0);
-    Consumer<String> consumer = s -> atomicInteger.incrementAndGet();
-
-    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+    BiFunction<String, byte[], Boolean> function =
+        (a, b) -> {
+          atomicInteger.incrementAndGet();
+          return true;
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
     return atomicInteger.get();
   }
 
@@ -1408,9 +1427,12 @@ public class MRocksDBManager implements IMetaManager {
       String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
       throws IllegalPathException {
     List<String> allResult = Collections.synchronizedList(new ArrayList<>());
-    Consumer<String> consumer = allResult::add;
-
-    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, consumer, nodetype);
+    BiFunction<String, byte[], Boolean> function =
+        (a, b) -> {
+          allResult.add(a);
+          return true;
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
 
     for (String path : allResult) {
       collection.add(new PartialPath(path));
@@ -1525,20 +1547,7 @@ public class MRocksDBManager implements IMetaManager {
   @Override
   public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    List<MeasurementPath> result = new ArrayList<>();
-    Map<String, byte[]> allMeasurement =
-        getKeyNumByPrefix(pathPattern, NODE_TYPE_MEASUREMENT, isPrefixMatch);
-    for (Entry<String, byte[]> entry : allMeasurement.entrySet()) {
-      try {
-        MeasurementSchema schema =
-            (MeasurementSchema) RocksDBUtils.parseNodeValue(entry.getValue(), FLAG_IS_SCHEMA);
-        PartialPath path = new PartialPath(RocksDBUtils.getPathByInnerName(entry.getKey()));
-        result.add(new MeasurementPath(path, schema));
-      } catch (ClassCastException e) {
-        throw new MetadataException(e);
-      }
-    }
-    return result;
+    return getMatchedMeasurementPath(pathPattern.getNodes());
   }
 
   /**
@@ -1565,7 +1574,31 @@ public class MRocksDBManager implements IMetaManager {
   public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
       PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
       throws MetadataException {
-    return null;
+    // todo update offset
+    Pair<List<MeasurementPath>, Integer> result =
+        new Pair<>(getMatchedMeasurementPath(pathPattern.getNodes()), offset + limit);
+    return result;
+  }
+
+  private List<MeasurementPath> getMatchedMeasurementPath(String[] nodes)
+      throws IllegalPathException {
+    List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
+    BiFunction<String, byte[], Boolean> function =
+        (a, b) -> {
+          try {
+            allResult.add(
+                new MeasurementPath(
+                    new PartialPath(a),
+                    (MeasurementSchema) RocksDBUtils.parseNodeValue(b, FLAG_IS_SCHEMA)));
+            return true;
+          } catch (IllegalPathException e) {
+            logger.error(e.getMessage());
+            return false;
+          }
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+    return allResult;
   }
 
   @Override