You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/18 00:27:15 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1] Resolve quadratic complexity issue when flushing numerous Internal/Entity nodes in SchemaFile (#9631)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 8ae80946a5 [To rel/1.1] Resolve quadratic complexity issue when flushing numerous Internal/Entity nodes in SchemaFile (#9631)
8ae80946a5 is described below

commit 8ae80946a5631736ea70c4025feca86e19d8087c
Author: ZhaoXin <x_...@163.com>
AuthorDate: Tue Apr 18 08:27:06 2023 +0800

    [To rel/1.1] Resolve quadratic complexity issue when flushing numerous Internal/Entity nodes in SchemaFile (#9631)
---
 .../store/disk/schemafile/SchemaFileConfig.java    |  2 +-
 .../mtree/store/disk/schemafile/SegmentedPage.java |  1 +
 .../store/disk/schemafile/pagemgr/PageManager.java | 90 ++++++++++++++++++++--
 .../metadata/mtree/schemafile/SchemaFileTest.java  | 87 ++++++++++++++++-----
 4 files changed, 152 insertions(+), 28 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java
index 2a21891c41..90004b1b36 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java
@@ -68,7 +68,7 @@ public class SchemaFileConfig {
 
   // region Segment Configuration
 
-  public static final int SEG_HEADER_SIZE = 25; // in bytes
+  public static final int SEG_HEADER_SIZE = 25; // (bytes) minimum segment size de facto
   public static final short SEG_OFF_DIG =
       2; // length of short, which is the type of segment offset and index
   public static final short SEG_MAX_SIZ = (short) (PAGE_LENGTH - PAGE_HEADER_SIZE - SEG_OFF_DIG);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java
index 2b8e14c599..0b6df45008 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java
@@ -38,6 +38,7 @@ public class SegmentedPage extends SchemaPage implements ISegmentedPage {
 
   // segment address array inside a page, map segmentIndex -> segmentOffset
   // if only one full-page segment inside, it still stores the offset
+  // TODO offset bits of segment never removed since it is 'sequential-indexed'
   private final transient List<Short> segOffsetLst;
 
   // maintains leaf segment instance inside this page, lazily instantiated
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
index ca5b04c855..48ab655a94 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
@@ -43,8 +43,10 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -81,6 +83,10 @@ public abstract class PageManager implements IPageManager {
   protected final Map<Integer, ISchemaPage> pageInstCache;
   protected final Map<Integer, ISchemaPage> dirtyPages;
 
+  // optimize retrieval of the smallest applicable DIRTY segmented page
+  // tiered by: MIN_SEG_SIZE, PAGE/16, PAGE/8, PAGE/4, PAGE/2, PAGE_SIZE
+  protected final LinkedList<Integer>[] tieredDirtyPageIndex = new LinkedList[SEG_SIZE_LST.length];
+
   protected final ReentrantLock evictLock;
   protected final PageLocks pageLocks;
 
@@ -102,6 +108,10 @@ public abstract class PageManager implements IPageManager {
       throws IOException, MetadataException {
     this.pageInstCache = Collections.synchronizedMap(new LinkedHashMap<>(PAGE_CACHE_SIZE, 1, true));
     this.dirtyPages = new ConcurrentHashMap<>();
+    for (int i = 0; i < tieredDirtyPageIndex.length; i++) {
+      tieredDirtyPageIndex[i] = new LinkedList<>();
+    }
+
     this.evictLock = new ReentrantLock();
     this.pageLocks = new PageLocks();
     this.lastPageIndex =
@@ -426,11 +436,13 @@ public abstract class PageManager implements IPageManager {
     }
     logWriter.commit();
     dirtyPages.clear();
+    Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
   }
 
   @Override
   public void clear() throws IOException, MetadataException {
     dirtyPages.clear();
+    Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
     pageInstCache.clear();
     lastPageIndex.set(0);
     logWriter = logWriter.renew();
@@ -486,10 +498,8 @@ public abstract class PageManager implements IPageManager {
     }
   }
 
-  @Deprecated
-  // TODO: improve to remove
   private long preAllocateSegment(short size) throws IOException, MetadataException {
-    ISegmentedPage page = getMinApplSegmentedPageInMem((short) (size + SEG_OFF_DIG));
+    ISegmentedPage page = getMinApplSegmentedPageInMem(size);
     return SchemaFile.getGlobalIndex(page.getPageIndex(), page.allocNewSegment(size));
   }
 
@@ -498,14 +508,42 @@ public abstract class PageManager implements IPageManager {
     return addPageToCache(page.getPageIndex(), page);
   }
 
+  /**
+   * Accessing dirtyPages will be expedited, while the retrieval from pageInstCache remains
+   * unaffected due to its limited capacity.
+   *
+   * @param size size of the expected segment
+   */
   protected ISegmentedPage getMinApplSegmentedPageInMem(short size) {
-    for (Map.Entry<Integer, ISchemaPage> entry : dirtyPages.entrySet()) {
-      if (entry.getValue().getAsSegmentedPage() != null
-          && entry.getValue().getAsSegmentedPage().isCapableForSegSize(size)) {
-        return dirtyPages.get(entry.getKey()).getAsSegmentedPage();
+    ISchemaPage targetPage = null;
+    int tierLoopCnt = 0;
+    for (int i = 0; i < tieredDirtyPageIndex.length && dirtyPages.size() > 0; i++) {
+      tierLoopCnt = tieredDirtyPageIndex[i].size();
+      while (size < SEG_SIZE_LST[i] && tierLoopCnt > 0) {
+        targetPage = dirtyPages.get(tieredDirtyPageIndex[i].pop());
+        tierLoopCnt--;
+
+        //  check validity of the retrieved targetPage, as the page type may have changed,
+        //   e.g., from SegmentedPage to InternalPage, or index could be stale
+        if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
+          // invalid index for SegmentedPage, drop the index and get next
+          continue;
+        }
+
+        // suitable page for requested size
+        if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+          sortSegmentedIntoIndex(targetPage, size);
+          return targetPage.getAsSegmentedPage();
+        }
+
+        // not large enough but legal for another retrieval
+        if (targetPage.getAsSegmentedPage().isCapableForSegSize(SEG_SIZE_LST[i])) {
+          tieredDirtyPageIndex[i].add(targetPage.getPageIndex());
+        }
       }
     }
 
+    // TODO refactor design related to pageInstCache to index its pages in further development
     for (Map.Entry<Integer, ISchemaPage> entry : pageInstCache.entrySet()) {
       if (entry.getValue().getAsSegmentedPage() != null
           && entry.getValue().getAsSegmentedPage().isCapableForSegSize(size)) {
@@ -516,6 +554,40 @@ public abstract class PageManager implements IPageManager {
     return allocateNewSegmentedPage().getAsSegmentedPage();
   }
 
+  /**
+   * Index SegmentedPage inside {@linkplain #dirtyPages} into tiered list by {@linkplain
+   * SchemaFileConfig#SEG_SIZE_LST}.
+   *
+   * <p>The level of its index depends on its AVAILABLE space.
+   *
+   * @param page SegmentedPage to be indexed, no guardian statements since all entrances are secured
+   *     for now
+   * @param newSegSize to re-integrate after a retrieval, the expected overhead shall be considered.
+   *     -1 for common dirty mark.
+   */
+  protected void sortSegmentedIntoIndex(ISchemaPage page, short newSegSize) {
+    // actual space occupied by a segment includes both its own length and the length of its offset.
+    // so available length for a segment is the spareSize minus the offset length
+    short availableSize =
+        newSegSize < 0
+            ? (short) (page.getAsSegmentedPage().getSpareSize() - SEG_OFF_DIG)
+            : (short) (page.getAsSegmentedPage().getSpareSize() - newSegSize - SEG_OFF_DIG);
+
+    // too small to index
+    if (availableSize < SEG_HEADER_SIZE) {
+      return;
+    }
+
+    // index range like: SEG_HEADER_SIZE <= [0] < SEG_SIZE_LST[0], ...
+    for (int i = 0; i < SEG_SIZE_LST.length; i++) {
+      // the last of SEG_SIZE_LST is the maximum page size, definitely larger than others
+      if (availableSize < SEG_SIZE_LST[i]) {
+        tieredDirtyPageIndex[i].add(page.getPageIndex());
+        return;
+      }
+    }
+  }
+
   protected synchronized ISchemaPage allocateNewSegmentedPage() {
     lastPageIndex.incrementAndGet();
     ISchemaPage newPage =
@@ -533,6 +605,10 @@ public abstract class PageManager implements IPageManager {
   protected void markDirty(ISchemaPage page) {
     page.markDirty();
     dirtyPages.put(page.getPageIndex(), page);
+
+    if (page.getAsSegmentedPage() != null) {
+      sortSegmentedIntoIndex(page, (short) -1);
+    }
   }
 
   protected ISchemaPage addPageToCache(int pageIndex, ISchemaPage page) {
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index 684e86e27a..9a018540ea 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -233,10 +233,10 @@ public class SchemaFileTest {
   }
 
   @Test
-  public void testFaltTree() throws MetadataException, IOException {
+  public void testFlatTree() throws MetadataException, IOException {
     ISchemaFile sf = SchemaFile.initSchemaFile("root.test.vRoot1", TEST_SCHEMA_REGION_ID);
 
-    Iterator<IMNode> ite = getTreeBFT(getFlatTree(50000, "aa"));
+    Iterator<IMNode> ite = getTreeBFT(getFlatTree(6000, "aa"));
     while (ite.hasNext()) {
       IMNode cur = ite.next();
       if (!cur.isMeasurement()) {
@@ -274,8 +274,8 @@ public class SchemaFileTest {
   }
 
   @Test
-  public void test200KMeasurement() throws MetadataException, IOException {
-    int i = 200000, j = 20;
+  public void test2KMeasurement() throws MetadataException, IOException {
+    int i = 2000, j = 20;
     IMNode sgNode = new StorageGroupMNode(null, "sgRoot", 11111111L);
     ISchemaFile sf = SchemaFile.initSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
 
@@ -298,8 +298,8 @@ public class SchemaFileTest {
     sf.writeMNode(dev);
 
     Assert.assertEquals(
-        "ma_199406", sf.getChildNode(dev, "m_199406").getAsMeasurementMNode().getAlias());
-    Assert.assertEquals("m_1995", sf.getChildNode(dev, "ma_1995").getName());
+        "ma_1994", sf.getChildNode(dev, "m_1994").getAsMeasurementMNode().getAlias());
+    Assert.assertEquals("m_19", sf.getChildNode(dev, "ma_19").getName());
 
     sf.delete(dev);
     Assert.assertNull(sf.getChildNode(sgNode, "dev_2"));
@@ -307,8 +307,51 @@ public class SchemaFileTest {
   }
 
   @Test
-  public void test10KDevices() throws MetadataException, IOException {
-    int i = 1000;
+  public void testMassiveSegment() throws MetadataException, IOException {
+    IMNode sgNode = new StorageGroupMNode(null, "sgRoot", 11111111L);
+    fillChildren(sgNode, 500, "MEN", this::supplyEntity);
+    ISchemaFile sf = SchemaFile.initSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
+
+    // verify operation with massive segment under quadratic complexity
+    try {
+      sf.writeMNode(sgNode);
+    } finally {
+      sf.close();
+    }
+
+    IMNode sgNode2 = new StorageGroupMNode(null, "sgRoot2", 11111111L);
+    fillChildren(sgNode2, 5000, "MEN", this::supplyEntity);
+    ISchemaFile sf2 = SchemaFile.initSchemaFile(sgNode2.getName(), TEST_SCHEMA_REGION_ID);
+    try {
+      sf2.writeMNode(sgNode2);
+    } finally {
+      sf2.close();
+    }
+
+    int cnt = 0;
+    sf = SchemaFile.loadSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
+    Iterator<IMNode> ite = sf.getChildren(sgNode);
+    while (ite.hasNext()) {
+      cnt++;
+      ite.next();
+    }
+    Assert.assertEquals(cnt, 500);
+    sf.close();
+
+    cnt = 0;
+    sf = SchemaFile.loadSchemaFile(sgNode2.getName(), TEST_SCHEMA_REGION_ID);
+    ite = sf.getChildren(sgNode2);
+    while (ite.hasNext()) {
+      cnt++;
+      ite.next();
+    }
+    Assert.assertEquals(cnt, 5000);
+    sf.close();
+  }
+
+  @Test
+  public void testDevices() throws MetadataException, IOException {
+    int i = 100;
     IMNode sgNode = new StorageGroupMNode(null, "sgRoot", 11111111L);
 
     // write with empty entitiy
@@ -331,7 +374,7 @@ public class SchemaFileTest {
       }
 
       // update to entity
-      i = 1000;
+      i = 100;
       while (i >= 0) {
         long addr = getSegAddrInContainer(sgNode.getChild("dev_" + i));
         IMNode aDevice = new EntityMNode(sgNode, "dev_" + i);
@@ -383,18 +426,19 @@ public class SchemaFileTest {
     }
 
     Set<String> resName = new HashSet<>();
-    // more measurement
+    // fill resName set with more measurement
     for (IMNode etn : sgNode.getChildren().values()) {
-      int j = 1000;
+      int j = 50;
       while (j >= 0) {
         addMeasurementChild(etn, String.format("mtc2_%d_%d", i, j));
-        if (resName.size() < 101) {
+        if (Math.random() > 0.5) {
           resName.add(String.format("mtc2_%d_%d", i, j));
         }
         j--;
       }
     }
 
+    // fill arbitraryNode list
     orderedTree = getTreeBFT(sgNode);
     sf = SchemaFile.loadSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
     List<IMNode> arbitraryNode = new ArrayList<>();
@@ -403,7 +447,7 @@ public class SchemaFileTest {
         node = orderedTree.next();
         if (!node.isMeasurement() && !node.isStorageGroup()) {
           sf.writeMNode(node);
-          if (arbitraryNode.size() < 50) {
+          if (Math.random() > 0.5) {
             arbitraryNode.add(node);
           }
         }
@@ -417,14 +461,17 @@ public class SchemaFileTest {
 
     sf = SchemaFile.loadSchemaFile("sgRoot", TEST_SCHEMA_REGION_ID);
 
+    // verify alias of random measurement
     for (String key : resName) {
-      IMNode resNode = sf.getChildNode(arbitraryNode.get(arbitraryNode.size() - 3), key);
+      IMNode resNode =
+          sf.getChildNode(arbitraryNode.get((int) (arbitraryNode.size() * Math.random())), key);
       Assert.assertTrue(
           resNode.getAsMeasurementMNode().getAlias().equals(resNode.getName() + "alias"));
     }
 
-    Iterator<IMNode> res = sf.getChildren(arbitraryNode.get(arbitraryNode.size() - 1));
-    int i2 = 0;
+    // verify children subset of random entity node
+    Iterator<IMNode> res =
+        sf.getChildren(arbitraryNode.get((int) (arbitraryNode.size() * Math.random())));
     while (res.hasNext()) {
       resName.remove(res.next().getName());
     }
@@ -593,11 +640,11 @@ public class SchemaFileTest {
   }
 
   @Test
-  public void test200KAlias() throws Exception {
+  public void test2KAlias() throws Exception {
     ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
     IMNode sgNode = new StorageGroupMNode(null, "mma", 111111111L);
     // 5 devices, each for 200k measurements
-    int factor20K = 20000;
+    int factor2K = 2000;
     List<IMNode> devs = new ArrayList<>();
     List<List> senList = new ArrayList<>();
     Map<String, String> aliasAns = new HashMap<>();
@@ -610,7 +657,7 @@ public class SchemaFileTest {
 
       for (IMNode dev : devs) {
         List<IMNode> sens = new ArrayList<>();
-        for (int i = 0; i < factor20K; i++) {
+        for (int i = 0; i < factor2K; i++) {
           sens.add(getMeasurementNode(dev, "s_" + i, null));
           dev.addChild(sens.get(i));
 
@@ -662,7 +709,7 @@ public class SchemaFileTest {
         cnt++;
         children.next();
       }
-      Assert.assertEquals(factor20K, cnt);
+      Assert.assertEquals(factor2K, cnt);
 
     } finally {
       sf.close();