You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/08/31 12:40:25 UTC

[iotdb] branch IOTDB-4251 updated: temp

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
     new de3923ab4a temp
de3923ab4a is described below

commit de3923ab4a5c15e728a69ef2d241edb1795ff438
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Wed Aug 31 20:40:17 2022 +0800

    temp
---
 .../tsfile/file/metadata/AlignedChunkMetadata.java |  5 ++
 .../iotdb/tsfile/file/metadata/ChunkMetadata.java  | 35 ++++++++
 .../iotdb/tsfile/file/metadata/IChunkMetadata.java |  2 +
 .../write/writer/MemoryControlTsFileIOWriter.java  | 96 ++++++++++++++++++++++
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  | 34 +++++---
 5 files changed, 162 insertions(+), 10 deletions(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
index a69e61b9b8..606fe76429 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 
@@ -183,6 +184,10 @@ public class AlignedChunkMetadata implements IChunkMetadata {
     throw new UnsupportedOperationException("VectorChunkMetadata doesn't support serial method");
   }
 
+  public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath) throws IOException {
+    return 0;
+  }
+
   @Override
   public byte getMask() {
     return 0;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..8c1fab686a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
@@ -161,6 +162,40 @@ public class ChunkMetadata implements IChunkMetadata {
     return byteLen;
   }
 
+  /**
+   * Serialize the chunk metadata with full path, data type and statistic
+   *
+   * @param outputStream OutputStream
+   * @param seriesFullPath the full path of the chunk metadata
+   * @return length
+   * @throws IOException
+   */
+  public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath)
+      throws IOException {
+    int byteLen = 0;
+    byteLen += ReadWriteIOUtils.write(new Binary(seriesFullPath), outputStream);
+    byteLen += ReadWriteIOUtils.write(tsDataType, outputStream);
+    byteLen += this.serializeTo(outputStream, true);
+    return byteLen;
+  }
+
+  /**
+   * Deserialize with full info, the result is store in param chunkMetadata
+   *
+   * @param buffer ByteBuffer
+   * @param chunkMetadata ChunkMetadata to store the result
+   * @return the full path of the measurement
+   * @throws IOException
+   */
+  public static String deserializeWithFullInfo(ByteBuffer buffer, ChunkMetadata chunkMetadata)
+      throws IOException {
+    String fullPath = ReadWriteIOUtils.readBinary(buffer).toString();
+    chunkMetadata.tsDataType = TSDataType.deserialize(ReadWriteIOUtils.readByte(buffer));
+    chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+    chunkMetadata.statistics = Statistics.deserialize(buffer, chunkMetadata.tsDataType);
+    return fullPath;
+  }
+
   /**
    * deserialize from ByteBuffer.
    *
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index 1cc819fd52..9d8c7a8b80 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -73,5 +73,7 @@ public interface IChunkMetadata {
 
   int serializeTo(OutputStream outputStream, boolean serializeStatistic) throws IOException;
 
+  int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath) throws IOException;
+
   byte getMask();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
new file mode 100644
index 0000000000..fd00260ff9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+  private long maxMetadataSize;
+  private long currentChunkMetadataSize = 0L;
+  private File chunkMetadataTempFile;
+  private LocalTsFileOutput tempOutput;
+  private List<Long> sortedSegmentPosition = new ArrayList<>();
+
+  public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
+  private static final byte VECTOR_TYPE = 1;
+  private static final byte NORMAL_TYPE = 2;
+
+  public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+    super(file);
+    this.maxMetadataSize = maxMetadataSize;
+    this.chunkMetadataTempFile = new File(file.getAbsoluteFile() + CHUNK_METADATA_TEMP_FILE_PREFIX);
+  }
+
+  @Override
+  public void endCurrentChunk() {
+    currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+    super.endCurrentChunk();
+    if (currentChunkMetadataSize > maxMetadataSize) {
+      // TODO: Sort and flush the chunk metadata to outside
+      try {
+        sortAndFlushChunkMetadata();
+      } catch (IOException e) {
+        LOG.error("Meets exception when flushing metadata to temp files", e);
+      }
+    }
+  }
+
+  private void sortAndFlushChunkMetadata() throws IOException {
+    // group by series
+    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
+    if (tempOutput == null) {
+      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+    }
+    sortedSegmentPosition.add(tempOutput.getPosition());
+    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+      List<IChunkMetadata> iChunkMetadataList = entry.getValue();
+      if (iChunkMetadataList.size() > 0
+          && iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
+        // this is a vector chunk
+        ReadWriteIOUtils.write(VECTOR_TYPE, tempOutput);
+        ReadWriteIOUtils.write(chunkMetadataList.size(), tempOutput);
+      } else {
+        ReadWriteIOUtils.write(NORMAL_TYPE, tempOutput);
+      }
+      for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+        chunkMetadata.serializeTo(tempOutput, true);
+      }
+    }
+  }
+
+  @Override
+  public void endFile() {
+    //		super.endFile();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f297f..59a8ec236d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -76,7 +76,7 @@ public class TsFileIOWriter implements AutoCloseable {
   protected File file;
 
   // current flushed Chunk
-  private ChunkMetadata currentChunkMetadata;
+  protected ChunkMetadata currentChunkMetadata;
   // current flushed ChunkGroup
   protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
   // all flushed ChunkGroups
@@ -240,6 +240,28 @@ public class TsFileIOWriter implements AutoCloseable {
     currentChunkMetadata = null;
   }
 
+  protected Map<Path, List<IChunkMetadata>> groupChunkMetadataListBySeries() {
+    // group ChunkMetadata by series
+    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+      List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
+      for (IChunkMetadata chunkMetadata : chunkMetadatas) {
+        Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+        chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+      }
+    }
+
+    if (chunkMetadataList != null && chunkMetadataList.size() > 0) {
+      ChunkMetadata chunkMetadata = chunkMetadataList.get(0);
+      Path series = new Path(currentChunkGroupDeviceId, chunkMetadata.getMeasurementUid());
+      chunkMetadataListMap
+          .computeIfAbsent(series, k -> new ArrayList<>())
+          .addAll(chunkMetadataList);
+    }
+    return chunkMetadataListMap;
+  }
+
   /**
    * write {@linkplain TsFileMetadata TSFileMetaData} to output stream and close it.
    *
@@ -253,15 +275,7 @@ public class TsFileIOWriter implements AutoCloseable {
     ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
 
     // group ChunkMetadata by series
-    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
-
-    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
-      List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
-      for (IChunkMetadata chunkMetadata : chunkMetadatas) {
-        Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
-        chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
-      }
-    }
+    Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
 
     MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
     TsFileMetadata tsFileMetaData = new TsFileMetadata();