You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/28 00:34:10 UTC

[iotdb] branch master updated: [IOTDB-5732] Add Device Schema Statistics (#9466)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f9c9ff29b [IOTDB-5732] Add Device Schema Statistics (#9466)
7f9c9ff29b is described below

commit 7f9c9ff29b3c574fd0b07e3a9f8f7d1bb1c16afa
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Tue Mar 28 08:33:59 2023 +0800

    [IOTDB-5732] Add Device Schema Statistics (#9466)
---
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  | 20 ++++---
 .../db/metadata/mtree/store/CachedMTreeStore.java  |  8 +--
 .../db/metadata/mtree/store/MemMTreeStore.java     |  2 +
 .../metadata/rescon/ISchemaRegionStatistics.java   |  2 +
 .../metadata/rescon/MemSchemaRegionStatistics.java | 15 ++++++
 .../schemaregion/SchemaRegionMemoryImpl.java       | 37 ++++++-------
 .../schemaregion/SchemaRegionSchemaFileImpl.java   | 63 +++++++++++-----------
 .../schemaRegion/SchemaStatisticsTest.java         | 31 +++++++++++
 8 files changed, 114 insertions(+), 64 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index f279497c94..9425a102a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -122,6 +122,7 @@ public class MTreeBelowSGCachedImpl {
       Function<IMeasurementMNode<ICachedMNode>, Map<String, String>> tagGetter,
       Runnable flushCallback,
       Consumer<IMeasurementMNode<ICachedMNode>> measurementProcess,
+      Consumer<IDeviceMNode<ICachedMNode>> deviceProcess,
       int schemaRegionId,
       CachedSchemaRegionStatistics regionStatistics)
       throws MetadataException, IOException {
@@ -133,14 +134,17 @@ public class MTreeBelowSGCachedImpl {
     this.rootNode = store.generatePrefix(storageGroupPath);
     levelOfSG = storageGroupPath.getNodeLength() - 1;
 
-    // recover measurement
-    try (MeasurementCollector<Void, ICachedMNode> collector =
-        new MeasurementCollector<Void, ICachedMNode>(
+    // recover MNode
+    try (MNodeCollector<Void, ICachedMNode> collector =
+        new MNodeCollector<Void, ICachedMNode>(
             this.rootNode, new PartialPath(storageGroupMNode.getFullPath()), this.store, true) {
-
-          protected Void collectMeasurement(IMeasurementMNode<ICachedMNode> node) {
-            measurementProcess.accept(node);
-            regionStatistics.addTimeseries(1L);
+          @Override
+          protected Void collectMNode(ICachedMNode node) {
+            if (node.isMeasurement()) {
+              measurementProcess.accept(node.getAsMeasurementMNode());
+            } else if (node.isDevice()) {
+              deviceProcess.accept(node.getAsDeviceMNode());
+            }
             return null;
           }
         }) {
@@ -164,7 +168,7 @@ public class MTreeBelowSGCachedImpl {
     levelOfSG = storageGroupMNode.getPartialPath().getNodeLength() - 1;
     this.tagGetter = tagGetter;
 
-    // recover measurement
+    // recover MNode
     try (MNodeCollector<Void, ICachedMNode> collector =
         new MNodeCollector<Void, ICachedMNode>(
             this.rootNode, new PartialPath(storageGroupMNode.getFullPath()), this.store, true) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
index c7ae851bc9..92b01ecc8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
@@ -58,17 +58,11 @@ public class CachedMTreeStore implements IMTreeStore<ICachedMNode> {
   private static final Logger logger = LoggerFactory.getLogger(CachedMTreeStore.class);
 
   private final MemManager memManager;
-
   private final ICacheManager cacheManager;
-
   private ISchemaFile file;
-
   private ICachedMNode root;
-
   private final Runnable flushCallback;
-
   private final IMNodeFactory<ICachedMNode> nodeFactory = CacheMNodeFactory.getInstance();
-
   private final CachedSchemaRegionStatistics regionStatistics;
 
   private final StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
@@ -339,6 +333,7 @@ public class CachedMTreeStore implements IMTreeStore<ICachedMNode> {
   public IDeviceMNode<ICachedMNode> setToEntity(ICachedMNode node) {
     IDeviceMNode<ICachedMNode> result = MNodeUtils.setToEntity(node, nodeFactory);
     if (result != node) {
+      regionStatistics.addDevice();
       memManager.updatePinnedSize(result.estimateSize() - node.estimateSize());
     }
     updateMNode(result.getAsMNode());
@@ -349,6 +344,7 @@ public class CachedMTreeStore implements IMTreeStore<ICachedMNode> {
   public ICachedMNode setToInternal(IDeviceMNode<ICachedMNode> entityMNode) {
     ICachedMNode result = MNodeUtils.setToInternal(entityMNode, nodeFactory);
     if (result != entityMNode) {
+      regionStatistics.deleteDevice();
       memManager.updatePinnedSize(result.estimateSize() - entityMNode.estimateSize());
     }
     updateMNode(result);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
index 356a15d3c6..5961048596 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
@@ -139,6 +139,7 @@ public class MemMTreeStore implements IMTreeStore<IMemMNode> {
   public IDeviceMNode<IMemMNode> setToEntity(IMemMNode node) {
     IDeviceMNode<IMemMNode> result = MNodeUtils.setToEntity(node, nodeFactory);
     if (result != node) {
+      regionStatistics.addDevice();
       requestMemory(result.estimateSize() - node.estimateSize());
     }
 
@@ -152,6 +153,7 @@ public class MemMTreeStore implements IMTreeStore<IMemMNode> {
   public IMemMNode setToInternal(IDeviceMNode<IMemMNode> entityMNode) {
     IMemMNode result = MNodeUtils.setToInternal(entityMNode, nodeFactory);
     if (result != entityMNode) {
+      regionStatistics.deleteDevice();
       releaseMemory(entityMNode.estimateSize() - result.estimateSize());
     }
     if (result.isDatabase()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaRegionStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaRegionStatistics.java
index 6dade84995..066ccbed5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaRegionStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/ISchemaRegionStatistics.java
@@ -28,6 +28,8 @@ public interface ISchemaRegionStatistics {
 
   long getSeriesNumber();
 
+  long getDevicesNumber();
+
   int getTemplateActivatedNumber();
 
   long getTemplateSeriesNumber();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaRegionStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaRegionStatistics.java
index c8747d3b07..35e6dfa964 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaRegionStatistics.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/MemSchemaRegionStatistics.java
@@ -31,6 +31,7 @@ public class MemSchemaRegionStatistics implements ISchemaRegionStatistics {
   private final int schemaRegionId;
   private final AtomicLong memoryUsage = new AtomicLong(0);
   private final AtomicLong seriesNumber = new AtomicLong(0);
+  private final AtomicLong devicesNumber = new AtomicLong(0);
   private final Map<Integer, Integer> templateUsage = new ConcurrentHashMap<>();
 
   private long mLogLength = 0;
@@ -70,6 +71,19 @@ public class MemSchemaRegionStatistics implements ISchemaRegionStatistics {
     schemaEngineStatistics.deleteTimeseries(deletedNum);
   }
 
+  @Override
+  public long getDevicesNumber() {
+    return devicesNumber.get();
+  }
+
+  public void addDevice() {
+    devicesNumber.incrementAndGet();
+  }
+
+  public void deleteDevice() {
+    devicesNumber.decrementAndGet();
+  }
+
   @Override
   public int getTemplateActivatedNumber() {
     return templateUsage.size();
@@ -130,6 +144,7 @@ public class MemSchemaRegionStatistics implements ISchemaRegionStatistics {
     schemaEngineStatistics.deleteTimeseries(seriesNumber.get());
     memoryUsage.getAndSet(0);
     seriesNumber.getAndSet(0);
+    devicesNumber.getAndSet(0);
     templateUsage.forEach(
         (templateId, cnt) -> schemaEngineStatistics.deactivateTemplate(templateId, cnt));
     templateUsage.clear();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index 209c6bc51d..b9bec0d522 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -130,10 +130,10 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private boolean isRecovering = true;
   private volatile boolean initialized = false;
 
-  private String storageGroupDirPath;
-  private String schemaRegionDirPath;
-  private String storageGroupFullPath;
-  private SchemaRegionId schemaRegionId;
+  private final String storageGroupDirPath;
+  private final String schemaRegionDirPath;
+  private final String storageGroupFullPath;
+  private final SchemaRegionId schemaRegionId;
 
   // the log file writer
   private boolean usingMLog = true;
@@ -145,7 +145,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private TagManager tagManager;
 
   // seriesNumberMonitor may be null
-  private final ISeriesNumerMonitor seriesNumerMonitor;
+  private final ISeriesNumerMonitor seriesNumberMonitor;
 
   // region Interfaces and Implementation of initialization、snapshot、recover and clear
   public SchemaRegionMemoryImpl(ISchemaRegionParams schemaRegionParams) throws MetadataException {
@@ -168,7 +168,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       }
     }
 
-    this.seriesNumerMonitor = schemaRegionParams.getSeriesNumberMonitor();
+    this.seriesNumberMonitor = schemaRegionParams.getSeriesNumberMonitor();
     this.regionStatistics =
         new MemSchemaRegionStatistics(
             schemaRegionId.getId(), schemaRegionParams.getSchemaEngineStatistics());
@@ -390,8 +390,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   public synchronized void deleteSchemaRegion() throws MetadataException {
     // collect all the LeafMNode in this schema region
     long seriesCount = regionStatistics.getSeriesNumber();
-    if (seriesNumerMonitor != null) {
-      seriesNumerMonitor.deleteTimeSeries((int) seriesCount);
+    if (seriesNumberMonitor != null) {
+      seriesNumberMonitor.deleteTimeSeries((int) seriesCount);
     }
 
     // clear all the components and release all the file handlers
@@ -478,6 +478,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
                 }
               },
               deviceMNode -> {
+                regionStatistics.addDevice();
                 if (deviceMNode.getSchemaTemplateIdWithState() >= 0) {
                   regionStatistics.activateTemplate(deviceMNode.getSchemaTemplateId());
                 }
@@ -531,7 +532,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       throw new SeriesOverflowException();
     }
 
-    if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(1)) {
+    if (seriesNumberMonitor != null && !seriesNumberMonitor.addTimeSeries(1)) {
       throw new SeriesNumberOverflowException();
     }
 
@@ -554,8 +555,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
                 plan.getProps(),
                 plan.getAlias());
       } catch (Throwable t) {
-        if (seriesNumerMonitor != null) {
-          seriesNumerMonitor.deleteTimeSeries(1);
+        if (seriesNumberMonitor != null) {
+          seriesNumberMonitor.deleteTimeSeries(1);
         }
         throw t;
       }
@@ -611,7 +612,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       throw new SeriesOverflowException();
     }
 
-    if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(seriesCount)) {
+    if (seriesNumberMonitor != null && !seriesNumberMonitor.addTimeSeries(seriesCount)) {
       throw new SeriesNumberOverflowException();
     }
 
@@ -640,8 +641,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
                 plan.getCompressors(),
                 plan.getAliasList());
       } catch (Throwable t) {
-        if (seriesNumerMonitor != null) {
-          seriesNumerMonitor.deleteTimeSeries(seriesCount);
+        if (seriesNumberMonitor != null) {
+          seriesNumberMonitor.deleteTimeSeries(seriesCount);
         }
         throw t;
       }
@@ -783,8 +784,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     removeFromTagInvertedIndex(measurementMNode);
 
     regionStatistics.deleteTimeseries(1L);
-    if (seriesNumerMonitor != null) {
-      seriesNumerMonitor.deleteTimeSeries(1);
+    if (seriesNumberMonitor != null) {
+      seriesNumberMonitor.deleteTimeSeries(1);
     }
   }
 
@@ -800,8 +801,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     removeFromTagInvertedIndex(measurementMNode);
 
     regionStatistics.deleteTimeseries(1L);
-    if (seriesNumerMonitor != null) {
-      seriesNumerMonitor.deleteTimeSeries(1);
+    if (seriesNumberMonitor != null) {
+      seriesNumberMonitor.deleteTimeSeries(1);
     }
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 897e733e15..7f100b0b6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -94,6 +94,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
 
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
@@ -190,19 +191,8 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
               new PartialPath(storageGroupFullPath),
               tagManager::readTags,
               this::flushCallback,
-              measurementMNode -> {
-                if (measurementMNode.getOffset() == -1) {
-                  return;
-                }
-                try {
-                  tagManager.recoverIndex(measurementMNode.getOffset(), measurementMNode);
-                } catch (IOException e) {
-                  logger.error(
-                      "Failed to recover tagIndex for {} in schemaRegion {}.",
-                      storageGroupFullPath + PATH_SEPARATOR + measurementMNode.getFullPath(),
-                      schemaRegionId);
-                }
-              },
+              measurementInitProcess(),
+              deviceInitProcess(),
               schemaRegionId.getId(),
               regionStatistics);
 
@@ -226,6 +216,32 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     initialized = true;
   }
 
+  private Consumer<IMeasurementMNode<ICachedMNode>> measurementInitProcess() {
+    return measurementMNode -> {
+      regionStatistics.addTimeseries(1L);
+      if (measurementMNode.getOffset() == -1) {
+        return;
+      }
+      try {
+        tagManager.recoverIndex(measurementMNode.getOffset(), measurementMNode);
+      } catch (IOException e) {
+        logger.error(
+            "Failed to recover tagIndex for {} in schemaRegion {}.",
+            storageGroupFullPath + PATH_SEPARATOR + measurementMNode.getFullPath(),
+            schemaRegionId);
+      }
+    };
+  }
+
+  private Consumer<IDeviceMNode<ICachedMNode>> deviceInitProcess() {
+    return deviceMNode -> {
+      regionStatistics.addDevice();
+      if (deviceMNode.getSchemaTemplateIdWithState() >= 0) {
+        regionStatistics.activateTemplate(deviceMNode.getSchemaTemplateId());
+      }
+    };
+  }
+
   private void flushCallback() {
     if (usingMLog && !isRecovering) {
       try {
@@ -505,25 +521,8 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
               storageGroupFullPath,
               schemaRegionId.getId(),
               regionStatistics,
-              measurementMNode -> {
-                regionStatistics.addTimeseries(1L);
-                if (measurementMNode.getOffset() == -1) {
-                  return;
-                }
-                try {
-                  tagManager.recoverIndex(measurementMNode.getOffset(), measurementMNode);
-                } catch (IOException e) {
-                  logger.error(
-                      "Failed to recover tagIndex for {} in schemaRegion {}.",
-                      storageGroupFullPath + PATH_SEPARATOR + measurementMNode.getFullPath(),
-                      schemaRegionId);
-                }
-              },
-              deviceMNode -> {
-                if (deviceMNode.getSchemaTemplateIdWithState() >= 0) {
-                  regionStatistics.activateTemplate(deviceMNode.getSchemaTemplateId());
-                }
-              },
+              measurementInitProcess(),
+              deviceInitProcess(),
               tagManager::readTags,
               this::flushCallback);
       logger.info(
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index 741a887084..e1f70963ff 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -43,6 +43,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -199,6 +200,36 @@ public class SchemaStatisticsTest extends AbstractSchemaRegionTest {
     Assert.assertEquals(4, engineStatistics.getTotalSeriesNumber());
   }
 
+  @Test
+  public void testDeviceNumStatistics() throws Exception {
+    ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
+    ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 1);
+    ISchemaEngineStatistics engineStatistics =
+        SchemaEngine.getInstance().getSchemaEngineStatistics();
+
+    SchemaRegionTestUtil.createSimpleTimeseriesByList(
+        schemaRegion1, Arrays.asList("root.sg1.d0", "root.sg1.d1.s1", "root.sg1.d1.s2.t1"));
+    SchemaRegionTestUtil.createSimpleTimeseriesByList(
+        schemaRegion2, Arrays.asList("root.sg2.d1.s3", "root.sg2.d2.s1", "root.sg2.d2.s2"));
+    SchemaRegionTestUtil.createSimpleTimeseriesByList(
+        schemaRegion2, Collections.singletonList("root.sg2.s1"));
+    // check series number
+    Assert.assertEquals(3, schemaRegion1.getSchemaRegionStatistics().getDevicesNumber());
+    Assert.assertEquals(3, schemaRegion2.getSchemaRegionStatistics().getDevicesNumber());
+
+    PathPatternTree patternTree = new PathPatternTree();
+    patternTree.appendPathPattern(new PartialPath("root.**.s1"));
+    patternTree.constructTree();
+    Assert.assertTrue(schemaRegion1.constructSchemaBlackList(patternTree) >= 1);
+    Assert.assertTrue(schemaRegion2.constructSchemaBlackList(patternTree) >= 1);
+    schemaRegion1.deleteTimeseriesInBlackList(patternTree);
+    schemaRegion2.deleteTimeseriesInBlackList(patternTree);
+
+    // check series number
+    Assert.assertEquals(2, schemaRegion1.getSchemaRegionStatistics().getDevicesNumber());
+    Assert.assertEquals(2, schemaRegion2.getSchemaRegionStatistics().getDevicesNumber());
+  }
+
   @Test
   public void testSchemaFileNodeStatistics() throws Exception {
     if (testParams.getSchemaEngineMode().equals("Schema_File")) {