You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/08/31 12:40:25 UTC
[iotdb] branch IOTDB-4251 updated: temp
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
new de3923ab4a temp
de3923ab4a is described below
commit de3923ab4a5c15e728a69ef2d241edb1795ff438
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Wed Aug 31 20:40:17 2022 +0800
temp
---
.../tsfile/file/metadata/AlignedChunkMetadata.java | 5 ++
.../iotdb/tsfile/file/metadata/ChunkMetadata.java | 35 ++++++++
.../iotdb/tsfile/file/metadata/IChunkMetadata.java | 2 +
.../write/writer/MemoryControlTsFileIOWriter.java | 96 ++++++++++++++++++++++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 34 +++++---
5 files changed, 162 insertions(+), 10 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
index a69e61b9b8..606fe76429 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
+import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
@@ -183,6 +184,10 @@ public class AlignedChunkMetadata implements IChunkMetadata {
throw new UnsupportedOperationException("VectorChunkMetadata doesn't support serial method");
}
+ public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath) throws IOException {
+ return 0;
+ }
+
@Override
public byte getMask() {
return 0;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index 831f8cd120..8c1fab686a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
@@ -161,6 +162,40 @@ public class ChunkMetadata implements IChunkMetadata {
return byteLen;
}
+ /**
+ * Serialize the chunk metadata with full path, data type and statistic
+ *
+ * @param outputStream OutputStream
+ * @param seriesFullPath the full path of the chunk metadata
+ * @return length
+ * @throws IOException
+ */
+ public int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath)
+ throws IOException {
+ int byteLen = 0;
+ byteLen += ReadWriteIOUtils.write(new Binary(seriesFullPath), outputStream);
+ byteLen += ReadWriteIOUtils.write(tsDataType, outputStream);
+ byteLen += this.serializeTo(outputStream, true);
+ return byteLen;
+ }
+
+ /**
+ * Deserialize with full info, the result is store in param chunkMetadata
+ *
+ * @param buffer ByteBuffer
+ * @param chunkMetadata ChunkMetadata to store the result
+ * @return the full path of the measurement
+ * @throws IOException
+ */
+ public static String deserializeWithFullInfo(ByteBuffer buffer, ChunkMetadata chunkMetadata)
+ throws IOException {
+ String fullPath = ReadWriteIOUtils.readBinary(buffer).toString();
+ chunkMetadata.tsDataType = TSDataType.deserialize(ReadWriteIOUtils.readByte(buffer));
+ chunkMetadata.offsetOfChunkHeader = ReadWriteIOUtils.readLong(buffer);
+ chunkMetadata.statistics = Statistics.deserialize(buffer, chunkMetadata.tsDataType);
+ return fullPath;
+ }
+
/**
* deserialize from ByteBuffer.
*
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
index 1cc819fd52..9d8c7a8b80 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/IChunkMetadata.java
@@ -73,5 +73,7 @@ public interface IChunkMetadata {
int serializeTo(OutputStream outputStream, boolean serializeStatistic) throws IOException;
+ int serializeWithFullInfo(OutputStream outputStream, String seriesFullPath) throws IOException;
+
byte getMask();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
new file mode 100644
index 0000000000..fd00260ff9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
+ private long maxMetadataSize;
+ private long currentChunkMetadataSize = 0L;
+ private File chunkMetadataTempFile;
+ private LocalTsFileOutput tempOutput;
+ private List<Long> sortedSegmentPosition = new ArrayList<>();
+
+ public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
+ private static final byte VECTOR_TYPE = 1;
+ private static final byte NORMAL_TYPE = 2;
+
+ public MemoryControlTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
+ super(file);
+ this.maxMetadataSize = maxMetadataSize;
+ this.chunkMetadataTempFile = new File(file.getAbsoluteFile() + CHUNK_METADATA_TEMP_FILE_PREFIX);
+ }
+
+ @Override
+ public void endCurrentChunk() {
+ currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
+ super.endCurrentChunk();
+ if (currentChunkMetadataSize > maxMetadataSize) {
+ // TODO: Sort and flush the chunk metadata to outside
+ try {
+ sortAndFlushChunkMetadata();
+ } catch (IOException e) {
+ LOG.error("Meets exception when flushing metadata to temp files", e);
+ }
+ }
+ }
+
+ private void sortAndFlushChunkMetadata() throws IOException {
+ // group by series
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
+ if (tempOutput == null) {
+ tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
+ }
+ sortedSegmentPosition.add(tempOutput.getPosition());
+ for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
+ List<IChunkMetadata> iChunkMetadataList = entry.getValue();
+ if (iChunkMetadataList.size() > 0
+ && iChunkMetadataList.get(0).getDataType() == TSDataType.VECTOR) {
+ // this is a vector chunk
+ ReadWriteIOUtils.write(VECTOR_TYPE, tempOutput);
+ ReadWriteIOUtils.write(chunkMetadataList.size(), tempOutput);
+ } else {
+ ReadWriteIOUtils.write(NORMAL_TYPE, tempOutput);
+ }
+ for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
+ chunkMetadata.serializeTo(tempOutput, true);
+ }
+ }
+ }
+
+ @Override
+ public void endFile() {
+ // super.endFile();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f297f..59a8ec236d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -76,7 +76,7 @@ public class TsFileIOWriter implements AutoCloseable {
protected File file;
// current flushed Chunk
- private ChunkMetadata currentChunkMetadata;
+ protected ChunkMetadata currentChunkMetadata;
// current flushed ChunkGroup
protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
// all flushed ChunkGroups
@@ -240,6 +240,28 @@ public class TsFileIOWriter implements AutoCloseable {
currentChunkMetadata = null;
}
+ protected Map<Path, List<IChunkMetadata>> groupChunkMetadataListBySeries() {
+ // group ChunkMetadata by series
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
+
+ for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
+ List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
+ for (IChunkMetadata chunkMetadata : chunkMetadatas) {
+ Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
+ }
+ }
+
+ if (chunkMetadataList != null && chunkMetadataList.size() > 0) {
+ ChunkMetadata chunkMetadata = chunkMetadataList.get(0);
+ Path series = new Path(currentChunkGroupDeviceId, chunkMetadata.getMeasurementUid());
+ chunkMetadataListMap
+ .computeIfAbsent(series, k -> new ArrayList<>())
+ .addAll(chunkMetadataList);
+ }
+ return chunkMetadataListMap;
+ }
+
/**
* write {@linkplain TsFileMetadata TSFileMetaData} to output stream and close it.
*
@@ -253,15 +275,7 @@ public class TsFileIOWriter implements AutoCloseable {
ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
// group ChunkMetadata by series
- Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
-
- for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
- List<ChunkMetadata> chunkMetadatas = chunkGroupMetadata.getChunkMetadataList();
- for (IChunkMetadata chunkMetadata : chunkMetadatas) {
- Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
- chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
- }
- }
+ Map<Path, List<IChunkMetadata>> chunkMetadataListMap = groupChunkMetadataListBySeries();
MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();