You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/18 00:27:15 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1] Resolve quadratic complexity issue when flushing numerous Internal/Entity nodes in SchemaFile (#9631)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 8ae80946a5 [To rel/1.1] Resolve quadratic complexity issue when flushing numerous Internal/Entity nodes in SchemaFile (#9631)
8ae80946a5 is described below
commit 8ae80946a5631736ea70c4025feca86e19d8087c
Author: ZhaoXin <x_...@163.com>
AuthorDate: Tue Apr 18 08:27:06 2023 +0800
[To rel/1.1] Resolve quadratic complexity issue when flushing numerous Internal/Entity nodes in SchemaFile (#9631)
---
.../store/disk/schemafile/SchemaFileConfig.java | 2 +-
.../mtree/store/disk/schemafile/SegmentedPage.java | 1 +
.../store/disk/schemafile/pagemgr/PageManager.java | 90 ++++++++++++++++++++--
.../metadata/mtree/schemafile/SchemaFileTest.java | 87 ++++++++++++++++-----
4 files changed, 152 insertions(+), 28 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java
index 2a21891c41..90004b1b36 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFileConfig.java
@@ -68,7 +68,7 @@ public class SchemaFileConfig {
// region Segment Configuration
- public static final int SEG_HEADER_SIZE = 25; // in bytes
+ public static final int SEG_HEADER_SIZE = 25; // (bytes) minimum segment size de facto
public static final short SEG_OFF_DIG =
2; // length of short, which is the type of segment offset and index
public static final short SEG_MAX_SIZ = (short) (PAGE_LENGTH - PAGE_HEADER_SIZE - SEG_OFF_DIG);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java
index 2b8e14c599..0b6df45008 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SegmentedPage.java
@@ -38,6 +38,7 @@ public class SegmentedPage extends SchemaPage implements ISegmentedPage {
// segment address array inside a page, map segmentIndex -> segmentOffset
// if only one full-page segment inside, it still stores the offset
+ // TODO offset bits of segment never removed since it is 'sequential-indexed'
private final transient List<Short> segOffsetLst;
// maintains leaf segment instance inside this page, lazily instantiated
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
index ca5b04c855..48ab655a94 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/pagemgr/PageManager.java
@@ -43,8 +43,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -81,6 +83,10 @@ public abstract class PageManager implements IPageManager {
protected final Map<Integer, ISchemaPage> pageInstCache;
protected final Map<Integer, ISchemaPage> dirtyPages;
+ // optimize retrieval of the smallest applicable DIRTY segmented page
+ // tiered by: MIN_SEG_SIZE, PAGE/16, PAGE/8, PAGE/4, PAGE/2, PAGE_SIZE
+ protected final LinkedList<Integer>[] tieredDirtyPageIndex = new LinkedList[SEG_SIZE_LST.length];
+
protected final ReentrantLock evictLock;
protected final PageLocks pageLocks;
@@ -102,6 +108,10 @@ public abstract class PageManager implements IPageManager {
throws IOException, MetadataException {
this.pageInstCache = Collections.synchronizedMap(new LinkedHashMap<>(PAGE_CACHE_SIZE, 1, true));
this.dirtyPages = new ConcurrentHashMap<>();
+ for (int i = 0; i < tieredDirtyPageIndex.length; i++) {
+ tieredDirtyPageIndex[i] = new LinkedList<>();
+ }
+
this.evictLock = new ReentrantLock();
this.pageLocks = new PageLocks();
this.lastPageIndex =
@@ -426,11 +436,13 @@ public abstract class PageManager implements IPageManager {
}
logWriter.commit();
dirtyPages.clear();
+ Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
}
@Override
public void clear() throws IOException, MetadataException {
dirtyPages.clear();
+ Arrays.stream(tieredDirtyPageIndex).forEach(LinkedList::clear);
pageInstCache.clear();
lastPageIndex.set(0);
logWriter = logWriter.renew();
@@ -486,10 +498,8 @@ public abstract class PageManager implements IPageManager {
}
}
- @Deprecated
- // TODO: improve to remove
private long preAllocateSegment(short size) throws IOException, MetadataException {
- ISegmentedPage page = getMinApplSegmentedPageInMem((short) (size + SEG_OFF_DIG));
+ ISegmentedPage page = getMinApplSegmentedPageInMem(size);
return SchemaFile.getGlobalIndex(page.getPageIndex(), page.allocNewSegment(size));
}
@@ -498,14 +508,42 @@ public abstract class PageManager implements IPageManager {
return addPageToCache(page.getPageIndex(), page);
}
+ /**
+ * Accessing dirtyPages will be expedited, while the retrieval from pageInstCache remains
+ * unaffected due to its limited capacity.
+ *
+ * @param size size of the expected segment
+ */
protected ISegmentedPage getMinApplSegmentedPageInMem(short size) {
- for (Map.Entry<Integer, ISchemaPage> entry : dirtyPages.entrySet()) {
- if (entry.getValue().getAsSegmentedPage() != null
- && entry.getValue().getAsSegmentedPage().isCapableForSegSize(size)) {
- return dirtyPages.get(entry.getKey()).getAsSegmentedPage();
+ ISchemaPage targetPage = null;
+ int tierLoopCnt = 0;
+ for (int i = 0; i < tieredDirtyPageIndex.length && dirtyPages.size() > 0; i++) {
+ tierLoopCnt = tieredDirtyPageIndex[i].size();
+ while (size < SEG_SIZE_LST[i] && tierLoopCnt > 0) {
+ targetPage = dirtyPages.get(tieredDirtyPageIndex[i].pop());
+ tierLoopCnt--;
+
+ // check validity of the retrieved targetPage, as the page type may have changed,
+ // e.g., from SegmentedPage to InternalPage, or index could be stale
+ if (targetPage == null || targetPage.getAsSegmentedPage() == null) {
+ // invalid index for SegmentedPage, drop the index and get next
+ continue;
+ }
+
+ // suitable page for requested size
+ if (targetPage.getAsSegmentedPage().isCapableForSegSize(size)) {
+ sortSegmentedIntoIndex(targetPage, size);
+ return targetPage.getAsSegmentedPage();
+ }
+
+ // not large enough but legal for another retrieval
+ if (targetPage.getAsSegmentedPage().isCapableForSegSize(SEG_SIZE_LST[i])) {
+ tieredDirtyPageIndex[i].add(targetPage.getPageIndex());
+ }
}
}
+ // TODO refactor design related to pageInstCache to index its pages in further development
for (Map.Entry<Integer, ISchemaPage> entry : pageInstCache.entrySet()) {
if (entry.getValue().getAsSegmentedPage() != null
&& entry.getValue().getAsSegmentedPage().isCapableForSegSize(size)) {
@@ -516,6 +554,40 @@ public abstract class PageManager implements IPageManager {
return allocateNewSegmentedPage().getAsSegmentedPage();
}
+ /**
+ * Index SegmentedPage inside {@linkplain #dirtyPages} into tiered list by {@linkplain
+ * SchemaFileConfig#SEG_SIZE_LST}.
+ *
+ * <p>The level of its index depends on its AVAILABLE space.
+ *
+ * @param page SegmentedPage to be indexed, no guardian statements since all entrances are secured
+ * for now
+ * @param newSegSize to re-integrate after a retrieval, the expected overhead shall be considered.
+ * -1 for common dirty mark.
+ */
+ protected void sortSegmentedIntoIndex(ISchemaPage page, short newSegSize) {
+ // actual space occupied by a segment includes both its own length and the length of its offset.
+ // so available length for a segment is the spareSize minus the offset length
+ short availableSize =
+ newSegSize < 0
+ ? (short) (page.getAsSegmentedPage().getSpareSize() - SEG_OFF_DIG)
+ : (short) (page.getAsSegmentedPage().getSpareSize() - newSegSize - SEG_OFF_DIG);
+
+ // too small to index
+ if (availableSize < SEG_HEADER_SIZE) {
+ return;
+ }
+
+ // index range like: SEG_HEADER_SIZE <= [0] < SEG_SIZE_LST[0], ...
+ for (int i = 0; i < SEG_SIZE_LST.length; i++) {
+ // the last of SEG_SIZE_LST is the maximum page size, definitely larger than others
+ if (availableSize < SEG_SIZE_LST[i]) {
+ tieredDirtyPageIndex[i].add(page.getPageIndex());
+ return;
+ }
+ }
+ }
+
protected synchronized ISchemaPage allocateNewSegmentedPage() {
lastPageIndex.incrementAndGet();
ISchemaPage newPage =
@@ -533,6 +605,10 @@ public abstract class PageManager implements IPageManager {
protected void markDirty(ISchemaPage page) {
page.markDirty();
dirtyPages.put(page.getPageIndex(), page);
+
+ if (page.getAsSegmentedPage() != null) {
+ sortSegmentedIntoIndex(page, (short) -1);
+ }
}
protected ISchemaPage addPageToCache(int pageIndex, ISchemaPage page) {
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
index 684e86e27a..9a018540ea 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/mtree/schemafile/SchemaFileTest.java
@@ -233,10 +233,10 @@ public class SchemaFileTest {
}
@Test
- public void testFaltTree() throws MetadataException, IOException {
+ public void testFlatTree() throws MetadataException, IOException {
ISchemaFile sf = SchemaFile.initSchemaFile("root.test.vRoot1", TEST_SCHEMA_REGION_ID);
- Iterator<IMNode> ite = getTreeBFT(getFlatTree(50000, "aa"));
+ Iterator<IMNode> ite = getTreeBFT(getFlatTree(6000, "aa"));
while (ite.hasNext()) {
IMNode cur = ite.next();
if (!cur.isMeasurement()) {
@@ -274,8 +274,8 @@ public class SchemaFileTest {
}
@Test
- public void test200KMeasurement() throws MetadataException, IOException {
- int i = 200000, j = 20;
+ public void test2KMeasurement() throws MetadataException, IOException {
+ int i = 2000, j = 20;
IMNode sgNode = new StorageGroupMNode(null, "sgRoot", 11111111L);
ISchemaFile sf = SchemaFile.initSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
@@ -298,8 +298,8 @@ public class SchemaFileTest {
sf.writeMNode(dev);
Assert.assertEquals(
- "ma_199406", sf.getChildNode(dev, "m_199406").getAsMeasurementMNode().getAlias());
- Assert.assertEquals("m_1995", sf.getChildNode(dev, "ma_1995").getName());
+ "ma_1994", sf.getChildNode(dev, "m_1994").getAsMeasurementMNode().getAlias());
+ Assert.assertEquals("m_19", sf.getChildNode(dev, "ma_19").getName());
sf.delete(dev);
Assert.assertNull(sf.getChildNode(sgNode, "dev_2"));
@@ -307,8 +307,51 @@ public class SchemaFileTest {
}
@Test
- public void test10KDevices() throws MetadataException, IOException {
- int i = 1000;
+ public void testMassiveSegment() throws MetadataException, IOException {
+ IMNode sgNode = new StorageGroupMNode(null, "sgRoot", 11111111L);
+ fillChildren(sgNode, 500, "MEN", this::supplyEntity);
+ ISchemaFile sf = SchemaFile.initSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
+
+ // verify operation with massive segment under quadratic complexity
+ try {
+ sf.writeMNode(sgNode);
+ } finally {
+ sf.close();
+ }
+
+ IMNode sgNode2 = new StorageGroupMNode(null, "sgRoot2", 11111111L);
+ fillChildren(sgNode2, 5000, "MEN", this::supplyEntity);
+ ISchemaFile sf2 = SchemaFile.initSchemaFile(sgNode2.getName(), TEST_SCHEMA_REGION_ID);
+ try {
+ sf2.writeMNode(sgNode2);
+ } finally {
+ sf2.close();
+ }
+
+ int cnt = 0;
+ sf = SchemaFile.loadSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
+ Iterator<IMNode> ite = sf.getChildren(sgNode);
+ while (ite.hasNext()) {
+ cnt++;
+ ite.next();
+ }
+ Assert.assertEquals(cnt, 500);
+ sf.close();
+
+ cnt = 0;
+ sf = SchemaFile.loadSchemaFile(sgNode2.getName(), TEST_SCHEMA_REGION_ID);
+ ite = sf.getChildren(sgNode2);
+ while (ite.hasNext()) {
+ cnt++;
+ ite.next();
+ }
+ Assert.assertEquals(cnt, 5000);
+ sf.close();
+ }
+
+ @Test
+ public void testDevices() throws MetadataException, IOException {
+ int i = 100;
IMNode sgNode = new StorageGroupMNode(null, "sgRoot", 11111111L);
// write with empty entitiy
@@ -331,7 +374,7 @@ public class SchemaFileTest {
}
// update to entity
- i = 1000;
+ i = 100;
while (i >= 0) {
long addr = getSegAddrInContainer(sgNode.getChild("dev_" + i));
IMNode aDevice = new EntityMNode(sgNode, "dev_" + i);
@@ -383,18 +426,19 @@ public class SchemaFileTest {
}
Set<String> resName = new HashSet<>();
- // more measurement
+ // fill resName set with more measurement
for (IMNode etn : sgNode.getChildren().values()) {
- int j = 1000;
+ int j = 50;
while (j >= 0) {
addMeasurementChild(etn, String.format("mtc2_%d_%d", i, j));
- if (resName.size() < 101) {
+ if (Math.random() > 0.5) {
resName.add(String.format("mtc2_%d_%d", i, j));
}
j--;
}
}
+ // fill arbitraryNode list
orderedTree = getTreeBFT(sgNode);
sf = SchemaFile.loadSchemaFile(sgNode.getName(), TEST_SCHEMA_REGION_ID);
List<IMNode> arbitraryNode = new ArrayList<>();
@@ -403,7 +447,7 @@ public class SchemaFileTest {
node = orderedTree.next();
if (!node.isMeasurement() && !node.isStorageGroup()) {
sf.writeMNode(node);
- if (arbitraryNode.size() < 50) {
+ if (Math.random() > 0.5) {
arbitraryNode.add(node);
}
}
@@ -417,14 +461,17 @@ public class SchemaFileTest {
sf = SchemaFile.loadSchemaFile("sgRoot", TEST_SCHEMA_REGION_ID);
+ // verify alias of random measurement
for (String key : resName) {
- IMNode resNode = sf.getChildNode(arbitraryNode.get(arbitraryNode.size() - 3), key);
+ IMNode resNode =
+ sf.getChildNode(arbitraryNode.get((int) (arbitraryNode.size() * Math.random())), key);
Assert.assertTrue(
resNode.getAsMeasurementMNode().getAlias().equals(resNode.getName() + "alias"));
}
- Iterator<IMNode> res = sf.getChildren(arbitraryNode.get(arbitraryNode.size() - 1));
- int i2 = 0;
+ // verify children subset of random entity node
+ Iterator<IMNode> res =
+ sf.getChildren(arbitraryNode.get((int) (arbitraryNode.size() * Math.random())));
while (res.hasNext()) {
resName.remove(res.next().getName());
}
@@ -593,11 +640,11 @@ public class SchemaFileTest {
}
@Test
- public void test200KAlias() throws Exception {
+ public void test2KAlias() throws Exception {
ISchemaFile sf = SchemaFile.initSchemaFile("root.sg", TEST_SCHEMA_REGION_ID);
IMNode sgNode = new StorageGroupMNode(null, "mma", 111111111L);
// 5 devices, each for 200k measurements
- int factor20K = 20000;
+ int factor2K = 2000;
List<IMNode> devs = new ArrayList<>();
List<List> senList = new ArrayList<>();
Map<String, String> aliasAns = new HashMap<>();
@@ -610,7 +657,7 @@ public class SchemaFileTest {
for (IMNode dev : devs) {
List<IMNode> sens = new ArrayList<>();
- for (int i = 0; i < factor20K; i++) {
+ for (int i = 0; i < factor2K; i++) {
sens.add(getMeasurementNode(dev, "s_" + i, null));
dev.addChild(sens.get(i));
@@ -662,7 +709,7 @@ public class SchemaFileTest {
cnt++;
children.next();
}
- Assert.assertEquals(factor20K, cnt);
+ Assert.assertEquals(factor2K, cnt);
} finally {
sf.close();