You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/09/13 12:00:37 UTC

[GitHub] [iotdb] JackieTien97 commented on a diff in pull request #7276: [IOTDB-4251] Persist ChunkMetadata in TsFileIOWriter ahead of time to save memory

JackieTien97 commented on code in PR #7276:
URL: https://github.com/apache/iotdb/pull/7276#discussion_r969493416


##########
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
+/**
+ * This writer control the total size of chunk metadata to avoid OOM when writing massive
+ * timeseries. <b>This writer can only be used in the scenarios where the chunk is written in
+ * order.</b> The order means lexicographical order and time order. The lexicographical order
+ * requires that, if the writer is going to write a series <i>S</i>, all data of the all series
+ * smaller than <i>S</i> in lexicographical order has been written to the writer. The time order
+ * requires that, for a single series <i>S</i>, if the writer is going to write a chunk <i>C</i> of
+ * it, all chunks of <i>S</i> whose start time is smaller than <i>C</i> should have been written to
+ * the writer. If you do not comply with the above requirements, metadata index tree may be
+ * generated incorrectly. As a result, the file cannot be queried correctly.
+ */
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
+  protected LocalTsFileOutput tempOutput;
+  protected volatile boolean hasChunkMetadataInDisk = false;
+  protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  Path lastSerializePath = null;
+
+  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".cmt";
+
+  public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    super(file);
+    this.maxMetadataSize = maxMetadataSize;
+    this.chunkMetadataTempFile = new File(file.getAbsoluteFile() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+  }
+
+  @Override
+  public void endCurrentChunk() {
+    currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    super.endCurrentChunk();
+  }
+
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device,
+   * you should make sure all data of current writing device has been written before this method is
+   * called.</b> For not aligned series, there is no such limitation.
+   *
+   * @throws IOException
+   */
+  public void checkMetadataSizeAndMayFlush() throws IOException {
+    // This function should be called after all data of an aligned device has been written
+    if (currentChunkMetadataSize > maxMetadataSize) {
+      try {
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        LOG.error("Meets exception when flushing metadata to temp file for {}", file, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
+  protected void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    hasChunkMetadataInDisk = true;
+    // the file structure in temp file will be
+    // chunkSize | chunkBuffer
+    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+      Path seriesPath = entry.getKey();
+      if (!seriesPath.equals(lastSerializePath)) {
+        // record the count of path to construct bloom filter later
+        pathCount++;
+      }
+      List<IChunkMetadata> iChunkMetadataList = entry.getValue();
+      writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
+      lastSerializePath = seriesPath;
+    }
+    // clear the cache metadata to release the memory
+    chunkGroupMetadataList.clear();
+    if (chunkMetadataList != null) {
+      chunkMetadataList.clear();
+    }
+  }
+
+  private void writeChunkMetadata(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, LocalTsFileOutput output)
+      throws IOException {
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+      PublicBAOS buffer = new PublicBAOS();
+      int size = chunkMetadata.serializeWithFullInfo(buffer, seriesPath.getFullPath());
+      ReadWriteIOUtils.write(size, output);
+      buffer.writeTo(output);
+    }
+  }
+
+  @Override
+  public void endFile() throws IOException {
+    if (!hasChunkMetadataInDisk) {
+      // all the chunk metadata is stored in memory
+      // sort the chunk metadata, construct the index tree
+      // and just close the file
+      super.endFile();
+      return;
+    }
+
+    // there is some chunk metadata already been written to the disk
+    // first we should flush the remaining chunk metadata in memory to disk
+    // then read the persisted chunk metadata from disk
+    sortAndFlushChunkMetadata();
+    tempOutput.close();
+
+    // read in the chunk metadata, and construct the index tree
+    readChunkMetadataAndConstructIndexTree();
+
+    // write magic string
+    out.write(MAGIC_STRING_BYTES);
+
+    // close file
+    out.close();
+    canWrite = false;
+    FileUtils.delete(new File(file + CHUNK_METADATA_TEMP_FILE_SUFFIX));
+  }
+
+  private void readChunkMetadataAndConstructIndexTree() throws IOException {
+    tempOutput.close();
+    long metaOffset = out.getPosition();
+
+    // serialize the SEPARATOR of MetaData
+    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
+    ChunkMetadataReadIterator iterator =
+        new ChunkMetadataReadIterator(
+            0,
+            chunkMetadataTempFile.length(),
+            new LocalTsFileInput(chunkMetadataTempFile.toPath()));
+    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+    Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
+    String currentDevice = null;
+    String prevDevice = null;
+    MetadataIndexNode currentIndexNode =
+        new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+    TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+    int seriesIdxForCurrDevice = 0;
+    BloomFilter filter =
+        BloomFilter.getEmptyBloomFilter(
+            TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
+
+    int indexCount = 0;
+    while (iterator.hasNextChunkMetadata()) {
+      // read in all chunk metadata of one series
+      // construct the timeseries metadata for this series
+      TimeseriesMetadata timeseriesMetadata = readTimeseriesMetadata(iterator);
+
+      indexCount++;
+      // build bloom filter
+      filter.add(currentSeries);
+      // construct the index tree node for the series
+      Path currentPath = null;
+      if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+        // this series is the time column of the aligned device
+        // the full series path will be like "root.sg.d."
+        // we remove the last . in the series id here
+        currentPath = new Path(currentSeries);
+        currentDevice = currentSeries.substring(0, currentSeries.length() - 1);
+      } else {
+        currentPath = new Path(currentSeries, true);
+        currentDevice = currentPath.getDevice();
+      }
+      if (!currentDevice.equals(prevDevice)) {
+        if (prevDevice != null) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          deviceMetadataIndexMap.put(
+              prevDevice,
+              generateRootNode(
+                  measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        measurementMetadataIndexQueue = new ArrayDeque<>();
+        seriesIdxForCurrDevice = 0;
+      }
+
+      if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+        if (currentIndexNode.isFull()) {
+          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+        }
+        if (timeseriesMetadata.getTSDataType() != TSDataType.VECTOR) {
+          currentIndexNode.addEntry(
+              new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition()));
+        } else {
+          currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition()));
+        }
+      }
+
+      prevDevice = currentDevice;
+      seriesIdxForCurrDevice++;
+      // serialize the timeseries metadata to file
+      timeseriesMetadata.serializeTo(out.wrapAsStream());
+    }
+
+    addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+    deviceMetadataIndexMap.put(
+        prevDevice,
+        generateRootNode(
+            measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+
+    if (indexCount != pathCount) {
+      throw new IOException(
+          String.format(
+              "Expected path count is %d, index path count is %d", pathCount, indexCount));
+    }
+
+    MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
+
+    TsFileMetadata tsFileMetadata = new TsFileMetadata();
+    tsFileMetadata.setMetadataIndex(metadataIndex);
+    tsFileMetadata.setMetaOffset(metaOffset);
+
+    int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+    size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+    // write TsFileMetaData size
+    ReadWriteIOUtils.write(size, out.wrapAsStream());
+  }
+
+  /**
+   * Read in all the chunk metadata for a series, and construct a TimeseriesMetadata for it
+   *
+   * @param iterator
+   * @return
+   * @throws IOException
+   */
+  private TimeseriesMetadata readTimeseriesMetadata(ChunkMetadataReadIterator iterator)
+      throws IOException {
+    List<IChunkMetadata> iChunkMetadataList = new ArrayList<>();
+    currentSeries = iterator.getAllChunkMetadataForNextSeries(iChunkMetadataList);
+    TimeseriesMetadata timeseriesMetadata =
+        super.constructOneTimeseriesMetadata(new Path(currentSeries), iChunkMetadataList);
+    if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
+      // set empty measurement id for time column
+      timeseriesMetadata.setMeasurementId("");
+    } else {
+      timeseriesMetadata.setMeasurementId(new Path(currentSeries, true).getMeasurement());
+    }
+    return timeseriesMetadata;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (tempOutput != null) {
+      this.tempOutput.close();
+    }
+  }
+
+  protected class ChunkMetadataReadIterator {
+
+    final LocalTsFileInput input;
+    final long startPosition;
+    final long endPosition;
+    final ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+    final ByteBuffer typeBuffer = ByteBuffer.allocate(1);

Review Comment:
   ```suggestion
   ```



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java:
##########
@@ -161,6 +162,40 @@ public int serializeTo(OutputStream outputStream, boolean serializeStatistic) th
     return byteLen;
   }
 
+  /**
+   * Serialize the chunk metadata with full path, data type and statistic
+   *
+   * @param outputStream OutputStream
+   * @param seriesFullPath the full path of the chunk metadata
+   * @return length
+   * @throws IOException
+   */
+  public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath)
+      throws IOException {
+    int byteLen = 0;
+    byteLen += ReadWriteIOUtils.write(new Binary(seriesFullPath), outputStream);
+    byteLen += ReadWriteIOUtils.write(tsDataType, outputStream);
+    byteLen += this.serializeTo(outputStream, true);
+    return byteLen;
+  }
+
+  /**
+   * Deserialize with full info, the result is store in param chunkMetadata
+   *
+   * @param buffer ByteBuffer
+   * @param chunkMetadata ChunkMetadata to store the result
+   * @return the full path of the measurement
+   * @throws IOException
+   */
+  public static String deserializeWithFullInfo(ByteBuffer buffer, ChunkMetadata chunkMetadata)
+      throws IOException {
+    String fullPath = ReadWriteIOUtils.readBinary(buffer).toString();

Review Comment:
   ```suggestion
       String fullPath = ReadWriteIOUtils.readString(buffer).toString();
   ```



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java:
##########
@@ -161,6 +162,40 @@ public int serializeTo(OutputStream outputStream, boolean serializeStatistic) th
     return byteLen;
   }
 
+  /**
+   * Serialize the chunk metadata with full path, data type and statistic
+   *
+   * @param outputStream OutputStream
+   * @param seriesFullPath the full path of the chunk metadata
+   * @return length
+   * @throws IOException
+   */
+  public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath)
+      throws IOException {
+    int byteLen = 0;
+    byteLen += ReadWriteIOUtils.write(new Binary(seriesFullPath), outputStream);

Review Comment:
   ```suggestion
       byteLen += ReadWriteIOUtils.write(seriesFullPath, outputStream);
   ```



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java:
##########
@@ -18,24 +18,35 @@
  */
 package org.apache.iotdb.db.engine.compaction.writer;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.writer.MemoryControlTsFileIOWriter;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
 public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
-  private TsFileIOWriter fileWriter;
+  private MemoryControlTsFileIOWriter fileWriter;
 
   private boolean isEmptyFile;
+  private TsFileResource resource;
 
   public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException {
-    this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile());
+    long sizeForFileWriter =
+        SystemInfo.getInstance().getMemorySizeForCompaction()
+            / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+            * 5
+            / 100L;

Review Comment:
   same as above



##########
server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java:
##########
@@ -57,11 +61,20 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
   public CrossSpaceCompactionWriter(
       List<TsFileResource> targetResources, List<TsFileResource> seqFileResources)
       throws IOException {
+    this.targetTsFileResources = targetResources;
     currentDeviceEndTime = new long[seqFileResources.size()];
     isEmptyFile = new boolean[seqFileResources.size()];
     isDeviceExistedInTargetFiles = new boolean[targetResources.size()];
+    long memorySizeForEachWriter =
+        SystemInfo.getInstance().getMemorySizeForCompaction()
+            / IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+            * 5
+            / 100L
+            / targetResources.size();

Review Comment:
   extract the same calculation way into a function in util class.



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
+/**
+ * This writer control the total size of chunk metadata to avoid OOM when writing massive
+ * timeseries. <b>This writer can only be used in the scenarios where the chunk is written in
+ * order.</b> The order means lexicographical order and time order. The lexicographical order
+ * requires that, if the writer is going to write a series <i>S</i>, all data of the all series
+ * smaller than <i>S</i> in lexicographical order has been written to the writer. The time order
+ * requires that, for a single series <i>S</i>, if the writer is going to write a chunk <i>C</i> of
+ * it, all chunks of <i>S</i> whose start time is smaller than <i>C</i> should have been written to
+ * the writer. If you do not comply with the above requirements, metadata index tree may be
+ * generated incorrectly. As a result, the file cannot be queried correctly.
+ */
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;

Review Comment:
   And while do restart recovery, we should also remember to deal with this temp file, whether to safely delete this or something else.



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
+/**
+ * This writer control the total size of chunk metadata to avoid OOM when writing massive
+ * timeseries. <b>This writer can only be used in the scenarios where the chunk is written in
+ * order.</b> The order means lexicographical order and time order. The lexicographical order
+ * requires that, if the writer is going to write a series <i>S</i>, all data of the all series
+ * smaller than <i>S</i> in lexicographical order has been written to the writer. The time order
+ * requires that, for a single series <i>S</i>, if the writer is going to write a chunk <i>C</i> of
+ * it, all chunks of <i>S</i> whose start time is smaller than <i>C</i> should have been written to
+ * the writer. If you do not comply with the above requirements, metadata index tree may be
+ * generated incorrectly. As a result, the file cannot be queried correctly.
+ */
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;

Review Comment:
   It seems that there is no where to delete this temp file.



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java:
##########
@@ -325,9 +344,10 @@ private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chu
    *
    * @param path Path of chunk
    * @param chunkMetadataList List of chunkMetadata about path(previous param)
+   * @return the constructed TimeseriesMetadata
    */
-  private void flushOneChunkMetadata(Path path, List<IChunkMetadata> chunkMetadataList)
-      throws IOException {
+  protected TimeseriesMetadata constructOneTimeseriesMetadata(
+      Path path, List<IChunkMetadata> chunkMetadataList) throws IOException {

Review Comment:
   ```suggestion
         List<IChunkMetadata> chunkMetadataList) throws IOException {
   ```
   We don't need `Path`, we can get it from `IChunkMetadata.getMeasuremntId()`;



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java:
##########
@@ -183,6 +183,10 @@ public int serializeTo(OutputStream outputStream, boolean serializeStatistic) {
     throw new UnsupportedOperationException("VectorChunkMetadata doesn't support serial method");
   }
 
+  public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath) {

Review Comment:
   ```suggestion
     @Override
     public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath) {
   ```



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
+/**
+ * This writer control the total size of chunk metadata to avoid OOM when writing massive
+ * timeseries. <b>This writer can only be used in the scenarios where the chunk is written in
+ * order.</b> The order means lexicographical order and time order. The lexicographical order
+ * requires that, if the writer is going to write a series <i>S</i>, all data of the all series
+ * smaller than <i>S</i> in lexicographical order has been written to the writer. The time order
+ * requires that, for a single series <i>S</i>, if the writer is going to write a chunk <i>C</i> of
+ * it, all chunks of <i>S</i> whose start time is smaller than <i>C</i> should have been written to
+ * the writer. If you do not comply with the above requirements, metadata index tree may be
+ * generated incorrectly. As a result, the file cannot be queried correctly.
+ */
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
+  protected LocalTsFileOutput tempOutput;
+  protected volatile boolean hasChunkMetadataInDisk = false;
+  protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  Path lastSerializePath = null;
+
+  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".cmt";
+
+  public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    super(file);
+    this.maxMetadataSize = maxMetadataSize;
+    this.chunkMetadataTempFile = new File(file.getAbsoluteFile() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+  }
+
+  @Override
+  public void endCurrentChunk() {
+    currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    super.endCurrentChunk();
+  }
+
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device,
+   * you should make sure all data of current writing device has been written before this method is
+   * called.</b> For not aligned series, there is no such limitation.
+   *
+   * @throws IOException
+   */
+  public void checkMetadataSizeAndMayFlush() throws IOException {
+    // This function should be called after all data of an aligned device has been written
+    if (currentChunkMetadataSize > maxMetadataSize) {
+      try {
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        LOG.error("Meets exception when flushing metadata to temp file for {}", file, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
+  protected void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    hasChunkMetadataInDisk = true;
+    // the file structure in temp file will be
+    // chunkSize | chunkBuffer
+    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+      Path seriesPath = entry.getKey();
+      if (!seriesPath.equals(lastSerializePath)) {
+        // record the count of path to construct bloom filter later
+        pathCount++;
+      }
+      List<IChunkMetadata> iChunkMetadataList = entry.getValue();
+      writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
+      lastSerializePath = seriesPath;
+    }
+    // clear the cache metadata to release the memory
+    chunkGroupMetadataList.clear();
+    if (chunkMetadataList != null) {
+      chunkMetadataList.clear();
+    }
+  }
+
+  private void writeChunkMetadata(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, LocalTsFileOutput output)
+      throws IOException {
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+      PublicBAOS buffer = new PublicBAOS();
+      int size = chunkMetadata.serializeWithFullInfo(buffer, seriesPath.getFullPath());
+      ReadWriteIOUtils.write(size, output);
+      buffer.writeTo(output);
+    }
+  }
+
+  @Override
+  public void endFile() throws IOException {
+    if (!hasChunkMetadataInDisk) {
+      // all the chunk metadata is stored in memory
+      // sort the chunk metadata, construct the index tree
+      // and just close the file
+      super.endFile();
+      return;
+    }
+
+    // there is some chunk metadata already been written to the disk
+    // first we should flush the remaining chunk metadata in memory to disk
+    // then read the persisted chunk metadata from disk
+    sortAndFlushChunkMetadata();

Review Comment:
   It also need to be discussed, we shouldn't waste to spill the remaining chunkmetadata into disk.



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
+/**
+ * This writer control the total size of chunk metadata to avoid OOM when writing massive
+ * timeseries. <b>This writer can only be used in the scenarios where the chunk is written in
+ * order.</b> The order means lexicographical order and time order. The lexicographical order
+ * requires that, if the writer is going to write a series <i>S</i>, all data of the all series
+ * smaller than <i>S</i> in lexicographical order has been written to the writer. The time order
+ * requires that, for a single series <i>S</i>, if the writer is going to write a chunk <i>C</i> of
+ * it, all chunks of <i>S</i> whose start time is smaller than <i>C</i> should have been written to
+ * the writer. If you do not comply with the above requirements, metadata index tree may be
+ * generated incorrectly. As a result, the file cannot be queried correctly.
+ */
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
+  protected LocalTsFileOutput tempOutput;
+  protected volatile boolean hasChunkMetadataInDisk = false;
+  protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  Path lastSerializePath = null;
+
+  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".cmt";
+
+  public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    super(file);
+    this.maxMetadataSize = maxMetadataSize;
+    this.chunkMetadataTempFile = new File(file.getAbsoluteFile() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+  }
+
+  @Override
+  public void endCurrentChunk() {
+    currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    super.endCurrentChunk();
+  }
+
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device,
+   * you should make sure all data of current writing device has been written before this method is
+   * called.</b> For not aligned series, there is no such limitation.
+   *
+   * @throws IOException
+   */
+  public void checkMetadataSizeAndMayFlush() throws IOException {
+    // This function should be called after all data of an aligned device has been written
+    if (currentChunkMetadataSize > maxMetadataSize) {
+      try {
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        LOG.error("Meets exception when flushing metadata to temp file for {}", file, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
+  protected void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    hasChunkMetadataInDisk = true;
+    // the file structure in temp file will be
+    // chunkSize | chunkBuffer
+    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+      Path seriesPath = entry.getKey();
+      if (!seriesPath.equals(lastSerializePath)) {
+        // record the count of path to construct bloom filter later
+        pathCount++;
+      }
+      List<IChunkMetadata> iChunkMetadataList = entry.getValue();
+      writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
+      lastSerializePath = seriesPath;
+    }
+    // clear the cache metadata to release the memory
+    chunkGroupMetadataList.clear();
+    if (chunkMetadataList != null) {
+      chunkMetadataList.clear();
+    }
+  }
+
+  private void writeChunkMetadata(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, LocalTsFileOutput output)
+      throws IOException {
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+      PublicBAOS buffer = new PublicBAOS();
+      int size = chunkMetadata.serializeWithFullInfo(buffer, seriesPath.getFullPath());
+      ReadWriteIOUtils.write(size, output);
+      buffer.writeTo(output);
+    }
+  }
+
+  @Override
+  public void endFile() throws IOException {
+    if (!hasChunkMetadataInDisk) {
+      // all the chunk metadata is stored in memory
+      // sort the chunk metadata, construct the index tree
+      // and just close the file
+      super.endFile();
+      return;
+    }
+
+    // there is some chunk metadata already been written to the disk
+    // first we should flush the remaining chunk metadata in memory to disk
+    // then read the persisted chunk metadata from disk
+    sortAndFlushChunkMetadata();
+    tempOutput.close();
+
+    // read in the chunk metadata, and construct the index tree
+    readChunkMetadataAndConstructIndexTree();
+
+    // write magic string
+    out.write(MAGIC_STRING_BYTES);
+
+    // close file
+    out.close();
+    canWrite = false;
+    FileUtils.delete(new File(file + CHUNK_METADATA_TEMP_FILE_SUFFIX));
+  }
+
+  private void readChunkMetadataAndConstructIndexTree() throws IOException {
+    tempOutput.close();
+    long metaOffset = out.getPosition();
+
+    // serialize the SEPARATOR of MetaData
+    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
+    ChunkMetadataReadIterator iterator =
+        new ChunkMetadataReadIterator(
+            0,
+            chunkMetadataTempFile.length(),
+            new LocalTsFileInput(chunkMetadataTempFile.toPath()));
+    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();

Review Comment:
   Shouldn't it be already sorted before spilling into disk?



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
+
+/**
+ * This writer control the total size of chunk metadata to avoid OOM when writing massive
+ * timeseries. <b>This writer can only be used in the scenarios where the chunk is written in
+ * order.</b> The order means lexicographical order and time order. The lexicographical order
+ * requires that, if the writer is going to write a series <i>S</i>, all data of the all series
+ * smaller than <i>S</i> in lexicographical order has been written to the writer. The time order
+ * requires that, for a single series <i>S</i>, if the writer is going to write a chunk <i>C</i> of
+ * it, all chunks of <i>S</i> whose start time is smaller than <i>C</i> should have been written to
+ * the writer. If you do not comply with the above requirements, metadata index tree may be
+ * generated incorrectly. As a result, the file cannot be queried correctly.
+ */
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+  protected long maxMetadataSize;
+  protected long currentChunkMetadataSize = 0L;
+  protected File chunkMetadataTempFile;
+  protected LocalTsFileOutput tempOutput;
+  protected volatile boolean hasChunkMetadataInDisk = false;
+  protected String currentSeries = null;
+  // record the total num of path in order to make bloom filter
+  protected int pathCount = 0;
+  Path lastSerializePath = null;
+
+  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".cmt";
+
+  public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    super(file);
+    this.maxMetadataSize = maxMetadataSize;
+    this.chunkMetadataTempFile = new File(file.getAbsoluteFile() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
+  }
+
+  @Override
+  public void endCurrentChunk() {
+    currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    super.endCurrentChunk();
+  }
+
+  /**
+   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
+   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device,
+   * you should make sure all data of current writing device has been written before this method is
+   * called.</b> For not aligned series, there is no such limitation.
+   *
+   * @throws IOException
+   */
+  public void checkMetadataSizeAndMayFlush() throws IOException {
+    // This function should be called after all data of an aligned device has been written
+    if (currentChunkMetadataSize > maxMetadataSize) {
+      try {
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        LOG.error("Meets exception when flushing metadata to temp file for {}", file, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
+   * flush them to a temp file.
+   *
+   * @throws IOException
+   */
+  protected void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    hasChunkMetadataInDisk = true;
+    // the file structure in temp file will be
+    // chunkSize | chunkBuffer
+    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+      Path seriesPath = entry.getKey();
+      if (!seriesPath.equals(lastSerializePath)) {
+        // record the count of path to construct bloom filter later
+        pathCount++;
+      }
+      List<IChunkMetadata> iChunkMetadataList = entry.getValue();
+      writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
+      lastSerializePath = seriesPath;
+    }
+    // clear the cache metadata to release the memory
+    chunkGroupMetadataList.clear();
+    if (chunkMetadataList != null) {
+      chunkMetadataList.clear();
+    }
+  }
+
+  private void writeChunkMetadata(
+      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, LocalTsFileOutput output)
+      throws IOException {
+    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+      PublicBAOS buffer = new PublicBAOS();
+      int size = chunkMetadata.serializeWithFullInfo(buffer, seriesPath.getFullPath());
+      ReadWriteIOUtils.write(size, output);
+      buffer.writeTo(output);
+    }
+  }
+
+  @Override
+  public void endFile() throws IOException {

Review Comment:
   We may think more about this function, if already serialize the chunkmetadata into binary stream, can we just copy that  into the end of tsfile intstead of deserializing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org