You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/12/28 08:05:58 UTC

[GitHub] [iotdb] MarcosZyk commented on a diff in pull request #8619: [IOTDB-5291] Add TiFile to Tag Model, support disk flush and query

MarcosZyk commented on code in PR #8619:
URL: https://github.com/apache/iotdb/pull/8619#discussion_r1058009958


##########
schema-engine-tag/src/assembly/resources/conf/schema-tag.properties:
##########
@@ -30,4 +30,28 @@
 
 # Datatype: int
 # How many device ids a memtable can insert, beyond which the memtable will become immutable
-# num_of_deviceIds_in_memTable = 65536
\ No newline at end of file
+# num_of_deviceIds_in_memTable = 65536
+
+# Datatype: int
+# How many memtables can be reserved in the memory, if the threshold value is exceeded, it will flush
+# num_of_immutable_memTable = 5
+
+# Datatype: long
+# Max chunk size flush to disk, if chunk size over this size, it will disassemble into multiple chunks.(unit: byte)
+# max_chunk_size = 1024*1024

Review Comment:
   1MB seems too large for one time disk IO. Change the default value to 16KB or 4KB, which is widely used in most DBMS.



##########
schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/flush/MemTableFlush.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.flush;
+
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.file.entry.TiFileHeader;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.request.FlushRequest;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.response.FlushResponse;
+import org.apache.iotdb.lsm.annotation.FlushProcessor;
+import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext;
+import org.apache.iotdb.lsm.levelProcess.FlushLevelProcessor;
+import org.apache.iotdb.lsm.sstable.bplustree.writer.BPlusTreeWriter;
+import org.apache.iotdb.lsm.sstable.fileIO.FileOutput;
+import org.apache.iotdb.lsm.util.BloomFilter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** flush for MemTable */
+@FlushProcessor(level = 0)
+public class MemTableFlush extends FlushLevelProcessor<MemTable, MemChunkGroup, FlushRequest> {
+  @Override
+  public List<MemChunkGroup> getChildren(
+      MemTable memNode, FlushRequest request, FlushRequestContext context) {
+    return new ArrayList<>(memNode.getMemChunkGroupMap().values());
+  }
+
+  @Override
+  public void flush(MemTable memNode, FlushRequest flushRequest, FlushRequestContext context)
+      throws IOException {
+    List<MemChunkGroup> memChunkGroups = getChildren(memNode, null, context);
+    Map<MemChunkGroup, String> memChunkGroupMapReverse =
+        memNode.getMemChunkGroupMap().entrySet().stream()
+            .collect(HashMap::new, (m, v) -> m.put(v.getValue(), v.getKey()), HashMap::putAll);
+    Map<String, Long> tagKeyToOffset = new HashMap<>();
+    FlushResponse flushResponse = context.getResponse();
+    for (MemChunkGroup memChunkGroup : memChunkGroups) {
+      tagKeyToOffset.put(
+          memChunkGroupMapReverse.get(memChunkGroup), flushResponse.getTagKeyOffset(memChunkGroup));
+    }
+    FileOutput fileOutput = context.getFileOutput();
+    BPlusTreeWriter bPlusTreeWriter = new BPlusTreeWriter(fileOutput);
+    TiFileHeader tiFileHeader = new TiFileHeader();
+    tiFileHeader.setTagKeyIndexOffset(bPlusTreeWriter.write(tagKeyToOffset, false));
+    List<String> tagKeyAndValues = getTagKeyAndValues(memNode);
+    BloomFilter bloomFilter = BloomFilter.getEmptyBloomFilter(0.05, 3);
+    for (String key : tagKeyAndValues) {
+      bloomFilter.add(key);
+    }
+    tiFileHeader.setBloomFilterOffset(fileOutput.write(bloomFilter));
+    fileOutput.write(tiFileHeader);
+    fileOutput.flush();
+    if (memNode.getDeletionList() != null && memNode.getDeletionList().size() != 0) {
+      flushDeletionList(memNode, flushRequest, context);
+    }
+  }
+
+  private void flushDeletionList(
+      MemTable memNode, FlushRequest flushRequest, FlushRequestContext context) throws IOException {
+    File deletionFile =
+        new File(flushRequest.getFlushDirPath(), flushRequest.getFlushDeletionFileName());
+    if (!deletionFile.exists()) {
+      deletionFile.createNewFile();
+    }
+    FileOutput fileOutput = new FileOutput(deletionFile);
+    for (Integer deletion : memNode.getDeletionList()) {
+      fileOutput.write(deletion);
+    }
+    fileOutput.flush();
+    fileOutput.close();
+  }
+
+  private List<String> getTagKeyAndValues(MemTable memNode) {
+    List<String> tagKeyAndValues = new ArrayList<>();
+    for (Map.Entry<String, MemChunkGroup> entry : memNode.getMemChunkGroupMap().entrySet()) {
+      String tagKey = entry.getKey();
+      for (Map.Entry<String, MemChunk> tagValueEntry :
+          entry.getValue().getMemChunkMap().entrySet()) {
+        String tagValue = tagValueEntry.getKey();
+        tagKeyAndValues.add(tagKey + tagValue);

Review Comment:
   Maybe we need a reserved separator, like ```.```, to avoid ambiguity.



##########
schema-engine-tag/src/main/java/org/apache/iotdb/lsm/levelProcess/FlushLevelProcessor.java:
##########
@@ -20,19 +20,26 @@
 
 import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext;
 
+import java.io.IOException;
+
 /** indicates the flush method of each layer of memory nodes */
-public abstract class FlushLevelProcessor<I, O>
-    extends BasicLevelProcessor<I, O, Object, FlushRequestContext> {
+public abstract class FlushLevelProcessor<I, O, R>
+    extends BasicLevelProcessor<I, O, R, FlushRequestContext> {
 
   /**
    * the flush method of memory node
    *
    * @param memNode memory node
    * @param context flush request context
    */
-  public abstract void flush(I memNode, FlushRequestContext context);
+  public abstract void flush(I memNode, R flushRequest, FlushRequestContext context)
+      throws IOException;
 
-  public void handle(I memNode, Object request, FlushRequestContext context) {
-    flush(memNode, context);
+  public void handle(I memNode, R request, FlushRequestContext context) {
+    try {
+      flush(memNode, request, context);
+    } catch (IOException e) {
+      throw new RuntimeException(e);

Review Comment:
   Make sure when this RuntimeException being thrown, corresponding resources can be released.



##########
schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/flush/MemChunkFlush.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.flush;
+
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.file.entry.Chunk;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.file.entry.ChunkIndex;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.file.entry.ChunkIndexEntry;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.file.entry.ChunkIndexHeader;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.request.FlushRequest;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.response.FlushResponse;
+import org.apache.iotdb.db.metadata.tagSchemaRegion.utils.ConvertUtils;
+import org.apache.iotdb.lsm.annotation.FlushProcessor;
+import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext;
+import org.apache.iotdb.lsm.levelProcess.FlushLevelProcessor;
+import org.apache.iotdb.lsm.sstable.fileIO.FileOutput;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** flush for MemChunk */
+@FlushProcessor(level = 2)
+public class MemChunkFlush extends FlushLevelProcessor<MemChunk, Object, FlushRequest> {
+
+  @Override
+  public List<Object> getChildren(
+      MemChunk memNode, FlushRequest request, FlushRequestContext context) {
+    return null;
+  }
+
+  @Override
+  public void flush(MemChunk memNode, FlushRequest request, FlushRequestContext context)
+      throws IOException {
+    FileOutput fileOutput = context.getFileOutput();
+    List<RoaringBitmap> roaringBitmaps = sliceMemChunk(memNode, request.getChunkMaxSize());
+    ChunkIndex chunkIndex = new ChunkIndex();
+    int count = 0;
+    for (RoaringBitmap roaringBitmap : roaringBitmaps) {
+      Chunk chunk = ConvertUtils.getChunkFromRoaringBitMap(roaringBitmap);
+      byte[] bytes = new byte[chunk.getChunkHeader().getSize()];
+      roaringBitmap.serialize(ByteBuffer.wrap(bytes));
+      fileOutput.write(bytes);
+      ChunkIndexEntry chunkIndexEntry =
+          ConvertUtils.getChunkIndexEntryFromRoaringBitMap(roaringBitmap);
+      chunkIndexEntry.setOffset(fileOutput.write(chunk.getChunkHeader()));
+      chunkIndex.getChunkIndexEntries().add(chunkIndexEntry);
+      count++;
+    }
+    chunkIndex.setChunkIndexHeader(new ChunkIndexHeader(count));
+    long offset = fileOutput.write(chunkIndex);
+    FlushResponse response = context.getResponse();
+    response.addChunkOffset(memNode, offset);
+  }
+
+  private List<RoaringBitmap> sliceMemChunk(MemChunk memNode, long chunkMaxSize) {
+    RoaringBitmap roaringBitmap = memNode.getRoaringBitmap();
+    int originalSize = roaringBitmap.serializedSizeInBytes();
+    int sliceNum = (int) (originalSize / chunkMaxSize) + 1;
+    if (sliceNum == 1) {
+      return Collections.singletonList(roaringBitmap);
+    } else {
+      List<RoaringBitmap> roaringBitmaps = new ArrayList<>();
+      for (int i = 0; i < sliceNum; i++) {
+        roaringBitmaps.add(new RoaringBitmap());
+      }
+      int[] results = roaringBitmap.stream().toArray();
+      int quotient = results.length % sliceNum;
+      int count = results.length / sliceNum;
+      int index = 0;
+      for (int i = 0; i < results.length; i++) {
+        int gap = index < quotient ? count + 1 : count;
+        int start = i;
+        for (; start <= i + gap; start++) {
+          roaringBitmaps.get(index).add(results[start]);
+        }
+        i = start;
+        index++;
+      }
+      return roaringBitmaps;

Review Comment:
   It seems the last few roaringBitmaps may be empty.



##########
schema-engine-tag/src/assembly/resources/conf/schema-tag.properties:
##########
@@ -30,4 +30,28 @@
 
 # Datatype: int
 # How many device ids a memtable can insert, beyond which the memtable will become immutable
-# num_of_deviceIds_in_memTable = 65536
\ No newline at end of file
+# num_of_deviceIds_in_memTable = 65536
+
+# Datatype: int
+# How many memtables can be reserved in the memory, if the threshold value is exceeded, it will flush
+# num_of_immutable_memTable = 5
+
+# Datatype: long
+# Max chunk size flush to disk, if chunk size over this size, it will disassemble into multiple chunks.(unit: byte)
+# max_chunk_size = 1024*1024
+
+# Datatype: int
+# The size of buffer used to write a record.(unit: byte)
+# out_buffer_size = 1024*1024
+
+####################
+### B+ tree Configuration
+####################
+
+# Datatype: int
+# The size of b+ tree page.(unit: byte)
+# b+tree_page_size = 4*1024
+
+# Datatype: int
+# Degree of a b+ tree.
+# degree = 250

Review Comment:
   If there should be 250 entries in one page with size of 4KB, each entry takes 16B. This parameter needs more consideration.



##########
schema-engine-tag/src/main/java/org/apache/iotdb/lsm/request/IFlushRequest.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.lsm.request;
+
+import org.apache.iotdb.lsm.context.requestcontext.RequestContext;
+
+import java.util.List;
+
+/** Represents a flush request that can be processed by the lsm framework */
+public class IFlushRequest<K, V, T> implements IRequest<K, V> {

Review Comment:
   Make this class an interface, and add a new class as implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org