You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/04/21 09:04:26 UTC

[iotdb] branch master updated: [rocksdb] updated the interface support (#5625)

This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2df96fee1e [rocksdb] updated the interface support (#5625)
2df96fee1e is described below

commit 2df96fee1e7a0d76121c2a6430af6ffc102030b4
Author: lisijia <44...@users.noreply.github.com>
AuthorDate: Thu Apr 21 17:04:21 2022 +0800

    [rocksdb] updated the interface support (#5625)
    
    [rocksdb] updated the interface support (#5625)
---
 .../Data-Modeling/SchemaRegion-rocksdb.md          |   6 +-
 .../Data-Modeling/SchemaRegion-rocksdb.md          |   6 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        | 123 ++++++++++++++-------
 .../schemaregion/rocksdb/MRocksDBUnitTest.java     |  26 +++++
 4 files changed, 113 insertions(+), 48 deletions(-)

diff --git a/docs/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md b/docs/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md
index f8a1cfb099..afa3daa150 100644
--- a/docs/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md
+++ b/docs/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md
@@ -79,8 +79,8 @@ The internal interface, that is, the invocation logic of other modules within th
 | getAllTimeseriesCount | internal | yes | |
 | getDevicesNum | internal | yes | |
 | getNodesCountInGivenLevel | internal | conditional support | path does not support wildcard |
-| getMeasurementCountGroupByLevel | internal | no | |
-| getNodesListInGivenLevel | internal | part of the support |specified path is not supported |
+| getMeasurementCountGroupByLevel | internal | yes | |
+| getNodesListInGivenLevel | internal | conditional support | path does not support wildcard |
 | getChildNodePathInNextLevel | internal | conditional support | path does not support wildcard |
 | getChildNodeNameInNextLevel | internal | conditional support | path does not support wildcard |
 | getBelongedDevices | internal | yes | |
@@ -90,6 +90,4 @@ The internal interface, that is, the invocation logic of other modules within th
 | getAllMeasurementByDevicePath | internal | yes | |
 | getDeviceNode | internal | yes | |
 | getMeasurementMNodes | internal | yes | |
-| collectMeasurementSchema | internal | no | |
-| collectTimeseriesSchema | internal | no | |
 | getSeriesSchemasAndReadLockDevice | internal | yes | |
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md b/docs/zh/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md
index 28298703b7..f4d3f833b2 100644
--- a/docs/zh/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md
+++ b/docs/zh/UserGuide/Data-Modeling/SchemaRegion-rocksdb.md
@@ -79,8 +79,8 @@ schema_engine_mode=Rocksdb_based
 | getAllTimeseriesCount | 内部接口 | 支持 | |
 | getDevicesNum | 内部接口 | 支持 | |
 | getNodesCountInGivenLevel | 内部接口 | 有条件支持 | 路径不支持通配 |
-| getMeasurementCountGroupByLevel | 内部接口 | 不支持 | |
-| getNodesListInGivenLevel | 内部接口 | 部分支持 | 暂不支持指定路径 |
+| getMeasurementCountGroupByLevel | 内部接口 | 支持 | |
+| getNodesListInGivenLevel | 内部接口 | 有条件支持 | 路径不支持通配 |
 | getChildNodePathInNextLevel | 内部接口 | 有条件支持 | 路径不支持通配 |
 | getChildNodeNameInNextLevel | 内部接口 | 有条件支持 | 路径不支持通配 |
 | getBelongedDevices | 内部接口 | 支持 | |
@@ -90,6 +90,4 @@ schema_engine_mode=Rocksdb_based
 | getAllMeasurementByDevicePath | 内部接口 | 支持 | |
 | getDeviceNode | 内部接口 | 支持 | |
 | getMeasurementMNodes | 内部接口 | 支持 | |
-| collectMeasurementSchema | 内部接口 | 不支持 | |
-| collectTimeseriesSchema | 内部接口 | 不支持 | |
 | getSeriesSchemasAndReadLockDevice | 内部接口 | 支持 | |
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index 4ad39d2f2b..4a604ee5de 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -120,14 +120,15 @@ import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_NODE_VALUE;
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ALIAS;
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT_STRING;
 import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.TABLE_NAME_TAGS;
-import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ZERO;
 import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 public class RSchemaRegion implements ISchemaRegion {
+
   private static final Logger logger = LoggerFactory.getLogger(RSchemaRegion.class);
 
   protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -867,7 +868,7 @@ public class RSchemaRegion implements ISchemaRegion {
       throws MetadataException {
     // todo support wildcard
     if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
-      throw new MetadataException(
+      throw new UnsupportedOperationException(
           "Wildcards are not currently supported for this operation"
               + " [COUNT NODES pathPattern].");
     }
@@ -894,51 +895,92 @@ public class RSchemaRegion implements ISchemaRegion {
   @Override
   public Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
       PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
-    throw new UnsupportedOperationException();
+    Map<PartialPath, Integer> result = new ConcurrentHashMap<>();
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          String key = new String(a);
+          String partialName = splitToPartialNameByLevel(key, level);
+          if (partialName != null) {
+            PartialPath path = null;
+            try {
+              path = new PartialPath(partialName);
+            } catch (IllegalPathException e) {
+              logger.warn(e.getMessage());
+            }
+            result.putIfAbsent(path, 0);
+            result.put(path, result.get(path) + 1);
+          }
+          return true;
+        };
+    traverseOutcomeBasins(
+        pathPattern.getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+    return result;
+  }
+
+  private String splitToPartialNameByLevel(String innerName, int level) {
+    StringBuilder stringBuilder = new StringBuilder(ROOT_STRING);
+    boolean currentIsFlag;
+    boolean lastIsFlag = false;
+    int j = 0;
+    for (int i = 0; i < innerName.length() && j <= level; i++) {
+      currentIsFlag = innerName.charAt(i) == '.';
+      if (currentIsFlag) {
+        j++;
+        currentIsFlag = true;
+      }
+      if (j <= 0 || lastIsFlag || (currentIsFlag && j > level)) {
+        lastIsFlag = false;
+        continue;
+      }
+      stringBuilder.append(innerName.charAt(i));
+      lastIsFlag = currentIsFlag;
+    }
+    if (j < level) {
+      return null;
+    }
+    return stringBuilder.toString();
   }
 
   @Override
   public List<PartialPath> getNodesListInGivenLevel(
       PartialPath pathPattern, int nodeLevel, boolean isPrefixMatch, StorageGroupFilter filter)
       throws MetadataException {
+    if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
+      throw new UnsupportedOperationException(
+          formatNotSupportInfo(Thread.currentThread().getStackTrace()[1].getMethodName()));
+    }
     return getNodesListInGivenLevel(pathPattern, nodeLevel);
   }
 
-  public List<PartialPath> getNodesListInGivenLevel(PartialPath pathPattern, int nodeLevel)
-      throws MetadataException {
-    // TODO: ignore pathPattern with *, all nodeLevel are start from "root.*"
-    List<PartialPath> results = new ArrayList<>();
-    if (nodeLevel == 0) {
-      results.add(new PartialPath(RSchemaConstants.ROOT));
-      return results;
-    }
-    // TODO: level one usually only contains small numbers, query in serialize
-    Set<String> paths;
-    StringBuilder builder = new StringBuilder();
-    if (nodeLevel <= 5) {
-      char level = (char) (ZERO + nodeLevel);
-      String prefix =
-          builder.append(RSchemaConstants.ROOT).append(PATH_SEPARATOR).append(level).toString();
-      paths = readWriteHandler.getAllByPrefix(prefix);
-    } else {
-      paths = ConcurrentHashMap.newKeySet();
-      char upperLevel = (char) (ZERO + nodeLevel - 1);
-      String prefix =
-          builder
-              .append(RSchemaConstants.ROOT)
-              .append(PATH_SEPARATOR)
-              .append(upperLevel)
-              .toString();
-      Set<String> parentPaths = readWriteHandler.getAllByPrefix(prefix);
-      parentPaths
-          .parallelStream()
-          .forEach(
-              x -> {
-                String targetPrefix = RSchemaUtils.getNextLevelOfPath(x, upperLevel);
-                paths.addAll(readWriteHandler.getAllByPrefix(targetPrefix));
-              });
-    }
-    return RSchemaUtils.convertToPartialPath(paths, nodeLevel);
+  private String formatNotSupportInfo(String methodName) {
+    return String.format("[%s] is not currently supported!", methodName);
+  }
+
+  private List<PartialPath> getNodesListInGivenLevel(PartialPath pathPattern, int nodeLevel) {
+    List<PartialPath> result = Collections.synchronizedList(new ArrayList<>());
+    Arrays.stream(ALL_NODE_TYPE_ARRAY)
+        .forEach(
+            x -> {
+              if (x == NODE_TYPE_ALIAS) {
+                return;
+              }
+              String innerName =
+                  RSchemaUtils.convertPartialPathToInnerByNodes(
+                      pathPattern.getNodes(), nodeLevel, x);
+              readWriteHandler
+                  .getAllByPrefix(innerName)
+                  .forEach(
+                      resultByPrefix -> {
+                        try {
+                          result.add(
+                              new PartialPath(RSchemaUtils.getPathByInnerName(resultByPrefix)));
+                        } catch (IllegalPathException e) {
+                          logger.warn(e.getMessage());
+                        }
+                      });
+            });
+    return result;
   }
 
   @Override
@@ -1080,7 +1122,8 @@ public class RSchemaRegion implements ISchemaRegion {
   private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithIndex(
       ShowTimeSeriesPlan plan, QueryContext context) {
     // temporarily unsupported
-    throw new UnsupportedOperationException("temporarily unsupported : showTimeseriesWithIndex");
+    throw new UnsupportedOperationException(
+        formatNotSupportInfo(Thread.currentThread().getStackTrace()[1].getMethodName()));
   }
 
   private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithoutIndex(
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java
index f2ce896406..9c65bf3f31 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 @Ignore
 public class MRocksDBUnitTest {
@@ -232,6 +233,31 @@ public class MRocksDBUnitTest {
     Assert.assertEquals(m3.getAlias(), "test");
   }
 
+  @Test
+  public void testGetMeasurementCountGroupByLevel() throws MetadataException {
+    PartialPath path1 = new PartialPath("root.test.sg.dd.m1");
+    rSchemaRegion.createTimeseries(
+        path1, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.GZIP, null, "ma");
+    PartialPath path2 = new PartialPath("root.test.sg.dd.m2");
+    rSchemaRegion.createTimeseries(
+        path2, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.GZIP, null, null);
+    PartialPath path3 = new PartialPath("root.test.sg.dd.m3");
+    rSchemaRegion.createTimeseries(
+        path3, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.GZIP, null, null);
+    PartialPath path4 = new PartialPath("root.test.sg.m4");
+    rSchemaRegion.createTimeseries(
+        path4, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.GZIP, null, null);
+
+    Map<PartialPath, Integer> result =
+        rSchemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.**"), 3, false);
+    Assert.assertEquals(3, (int) result.get(new PartialPath("root.test.sg.dd")));
+    Assert.assertEquals(1, (int) result.get(new PartialPath("root.test.sg.m4")));
+
+    result =
+        rSchemaRegion.getMeasurementCountGroupByLevel(new PartialPath("root.test.**"), 2, false);
+    Assert.assertEquals(4, (int) result.get(new PartialPath("root.test.sg")));
+  }
+
   @After
   public void clean() throws MetadataException {
     rSchemaRegion.deleteSchemaRegion();