You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/09/06 09:09:38 UTC

[iotdb] branch IOTDB-4251 updated: finish memory control tsfile io writer

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
     new 1afdabe328 finish memory control tsfile io writer
1afdabe328 is described below

commit 1afdabe328399e5c6d474ed277d51a5d99625cf6
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Sep 6 17:09:28 2022 +0800

    finish memory control tsfile io writer
---
 .../file/metadata/MetadataIndexConstructor.java    |   4 +-
 .../tsfile/file/metadata/MetadataIndexNode.java    |   2 +-
 .../iotdb/tsfile/file/metadata/TsFileMetadata.java |   9 +-
 .../write/writer/MemoryControlTsFileIOWriter.java  | 123 ++++++++++++++++++++-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   4 +-
 5 files changed, 132 insertions(+), 10 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index 062ffd6183..beb9ddb3e5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -123,7 +123,7 @@ public class MetadataIndexConstructor {
    * @param out tsfile output
    * @param type MetadataIndexNode type
    */
-  private static MetadataIndexNode generateRootNode(
+  public static MetadataIndexNode generateRootNode(
       Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
       throws IOException {
     int queueSize = metadataIndexNodeQueue.size();
@@ -148,7 +148,7 @@ public class MetadataIndexConstructor {
     return metadataIndexNodeQueue.poll();
   }
 
-  private static void addCurrentIndexNodeToQueue(
+  public static void addCurrentIndexNodeToQueue(
       MetadataIndexNode currentIndexNode,
       Queue<MetadataIndexNode> metadataIndexNodeQueue,
       TsFileOutput out)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f6336b3..1d3972cafe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -74,7 +74,7 @@ public class MetadataIndexNode {
     this.children.add(metadataIndexEntry);
   }
 
-  boolean isFull() {
+  public boolean isFull() {
     return children.size() >= config.getMaxDegreeOfIndexNode();
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2da1..f6f974fc1a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -105,10 +105,15 @@ public class TsFileMetadata {
    * @param outputStream -output stream to determine byte length
    * @return -byte length
    */
-  public int serializeBloomFilter(OutputStream outputStream, Set<Path> paths) throws IOException {
-    int byteLen = 0;
+  public int buildAndSerializeBloomFilter(OutputStream outputStream, Set<Path> paths)
+      throws IOException {
     BloomFilter filter = buildBloomFilter(paths);
+    return serializeBloomFilter(outputStream, filter);
+  }
 
+  public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter)
+      throws IOException {
+    int byteLen = 0;
     byte[] bytes = filter.serialize();
     byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream);
     outputStream.write(bytes);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index 371f4d2ae7..acd0ba1aa4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -19,14 +19,21 @@
 
 package org.apache.iotdb.tsfile.write.writer;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -43,6 +50,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
 
 public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
   private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
@@ -54,6 +65,9 @@ public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
   // it stores the start address of persisted chunk metadata for per series
   protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
   protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  Path lastSerializePath = null;
 
   public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
   private static final byte VECTOR_TYPE = 1;
@@ -99,8 +113,12 @@ public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
     for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
       segmentForPerSeries.add(tempOutput.getPosition());
       Path seriesPath = entry.getKey();
+      if (!seriesPath.equals(lastSerializePath)) {
+        pathCount++;
+      }
       List<IChunkMetadata> iChunkMetadataList = entry.getValue();
       writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
+      lastSerializePath = seriesPath;
     }
   }
 
@@ -188,20 +206,117 @@ public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
 
     // serialize the SEPARATOR of MetaData
     ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
     ChunkMetadataReadIterator iterator =
         new ChunkMetadataReadIterator(
             0,
             chunkMetadataTempFile.length(),
             new LocalTsFileInput(chunkMetadataTempFile.toPath()));
+    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+    Queue<MetadataIndexNode> measurementMetadataIndexQueue = null;
+    String currentDevice = null;
+    String prevDevice = null;
+    MetadataIndexNode currentIndexNode =
+        new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+    TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+    int seriesIdxForCurrDevice = 0;
+    BloomFilter filter =
+        BloomFilter.getEmptyBloomFilter(
+            TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
     while (iterator.hasNextChunkMetadata()) {
-      // 1. read in all chunk metadata of one series
-      // 2. construct the timeseries metadata for this series
-      // 3. construct the index tree node for the series
-      // 4. serialize the timeseries metadata to file
+      // read in all chunk metadata of one series
+      // construct the timeseries metadata for this series
       TimeseriesMetadata timeseriesMetadata = readTimeseriesMetadata(iterator);
+      // build bloom filter
+      filter.add(currentSeries);
+      // construct the index tree node for the series
+      currentDevice = new Path(currentSeries).getDevice();
+      if (!currentDevice.equals(prevDevice)) {
+        if (prevDevice != null) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          deviceMetadataIndexMap.put(
+              prevDevice,
+              generateRootNode(
+                  measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        measurementMetadataIndexQueue = new ArrayDeque<>();
+        seriesIdxForCurrDevice = 0;
+      }
+
+      if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+        if (currentIndexNode.isFull()) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        currentIndexNode.addEntry(
+            new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
+      }
+
+      prevDevice = currentDevice;
+      seriesIdxForCurrDevice++;
+      // serialize the timeseries metadata to file
+      timeseriesMetadata.serializeTo(out.wrapAsStream());
+    }
+
+    MetadataIndexNode metadataIndex = null;
+    // if not exceed the max child nodes num, ignore the device index and directly point to the
+    // measurement
+    if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
+      MetadataIndexNode metadataIndexNode =
+          new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+      for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+        metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+        entry.getValue().serializeTo(out.wrapAsStream());
+      }
+      metadataIndexNode.setEndOffset(out.getPosition());
+      metadataIndex = metadataIndexNode;
+    } else {
+      // else, build level index for devices
+      Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>();
+      currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+
+      for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+        // when constructing from internal node, each node is related to an entry
+        if (currentIndexNode.isFull()) {
+          addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+        }
+        currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+        entry.getValue().serializeTo(out.wrapAsStream());
+      }
+      addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+      MetadataIndexNode deviceMetadataIndexNode =
+          generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
+      deviceMetadataIndexNode.setEndOffset(out.getPosition());
+      metadataIndex = deviceMetadataIndexNode;
     }
+
+    TsFileMetadata tsFileMetadata = new TsFileMetadata();
+    tsFileMetadata.setMetadataIndex(metadataIndex);
+    tsFileMetadata.setMetaOffset(metaOffset);
+
+    int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+    size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+    // write TsFileMetaData size
+    ReadWriteIOUtils.write(size, out.wrapAsStream());
+
+    // write magic string
+    out.write(MAGIC_STRING_BYTES);
+
+    // close file
+    out.close();
+    canWrite = false;
   }
 
+  /**
+   * Read in all the chunk metadata for a series, and construct a TimeseriesMetadata for it
+   *
+   * @param iterator
+   * @return
+   * @throws IOException
+   */
   private TimeseriesMetadata readTimeseriesMetadata(ChunkMetadataReadIterator iterator)
       throws IOException {
     Pair<String, IChunkMetadata> currentPair = iterator.getCurrentPair();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index c34ab1df63..aba72ce2e4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -294,7 +294,9 @@ public class TsFileIOWriter implements AutoCloseable {
     }
 
     // write bloom filter
-    size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
+    size +=
+        tsFileMetaData.buildAndSerializeBloomFilter(
+            out.wrapAsStream(), chunkMetadataListMap.keySet());
     if (logger.isDebugEnabled()) {
       logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
     }