You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/09/06 09:09:38 UTC
[iotdb] branch IOTDB-4251 updated: finish memory control tsfile io writer
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4251
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4251 by this push:
new 1afdabe328 finish memory control tsfile io writer
1afdabe328 is described below
commit 1afdabe328399e5c6d474ed277d51a5d99625cf6
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Tue Sep 6 17:09:28 2022 +0800
finish memory control tsfile io writer
---
.../file/metadata/MetadataIndexConstructor.java | 4 +-
.../tsfile/file/metadata/MetadataIndexNode.java | 2 +-
.../iotdb/tsfile/file/metadata/TsFileMetadata.java | 9 +-
.../write/writer/MemoryControlTsFileIOWriter.java | 123 ++++++++++++++++++++-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 4 +-
5 files changed, 132 insertions(+), 10 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
index 062ffd6183..beb9ddb3e5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexConstructor.java
@@ -123,7 +123,7 @@ public class MetadataIndexConstructor {
* @param out tsfile output
* @param type MetadataIndexNode type
*/
- private static MetadataIndexNode generateRootNode(
+ public static MetadataIndexNode generateRootNode(
Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
throws IOException {
int queueSize = metadataIndexNodeQueue.size();
@@ -148,7 +148,7 @@ public class MetadataIndexConstructor {
return metadataIndexNodeQueue.poll();
}
- private static void addCurrentIndexNodeToQueue(
+ public static void addCurrentIndexNodeToQueue(
MetadataIndexNode currentIndexNode,
Queue<MetadataIndexNode> metadataIndexNodeQueue,
TsFileOutput out)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 3f6f6336b3..1d3972cafe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -74,7 +74,7 @@ public class MetadataIndexNode {
this.children.add(metadataIndexEntry);
}
- boolean isFull() {
+ public boolean isFull() {
return children.size() >= config.getMaxDegreeOfIndexNode();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 95e01e2da1..f6f974fc1a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -105,10 +105,15 @@ public class TsFileMetadata {
* @param outputStream -output stream to determine byte length
* @return -byte length
*/
- public int serializeBloomFilter(OutputStream outputStream, Set<Path> paths) throws IOException {
- int byteLen = 0;
+ public int buildAndSerializeBloomFilter(OutputStream outputStream, Set<Path> paths)
+ throws IOException {
BloomFilter filter = buildBloomFilter(paths);
+ return serializeBloomFilter(outputStream, filter);
+ }
+ public int serializeBloomFilter(OutputStream outputStream, BloomFilter filter)
+ throws IOException {
+ int byteLen = 0;
byte[] bytes = filter.serialize();
byteLen += ReadWriteForEncodingUtils.writeUnsignedVarInt(bytes.length, outputStream);
outputStream.write(bytes);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
index 371f4d2ae7..acd0ba1aa4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/MemoryControlTsFileIOWriter.java
@@ -19,14 +19,21 @@
package org.apache.iotdb.tsfile.write.writer;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -43,6 +50,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.TreeMap;
+
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
+import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
private static final Logger LOG = LoggerFactory.getLogger(MemoryControlTsFileIOWriter.class);
@@ -54,6 +65,9 @@ public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
// it stores the start address of persisted chunk metadata for per series
protected Queue<Long> segmentForPerSeries = new ArrayDeque<>();
protected String currentSeries = null;
+ // record the total num of path in order to make bloom filter
+ protected int pathCount = 0;
+ Path lastSerializePath = null;
public static final String CHUNK_METADATA_TEMP_FILE_PREFIX = ".cmt";
private static final byte VECTOR_TYPE = 1;
@@ -99,8 +113,12 @@ public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
segmentForPerSeries.add(tempOutput.getPosition());
Path seriesPath = entry.getKey();
+ if (!seriesPath.equals(lastSerializePath)) {
+ pathCount++;
+ }
List<IChunkMetadata> iChunkMetadataList = entry.getValue();
writeChunkMetadata(iChunkMetadataList, seriesPath, tempOutput);
+ lastSerializePath = seriesPath;
}
}
@@ -188,20 +206,117 @@ public class MemoryControlTsFileIOWriter extends TsFileIOWriter {
// serialize the SEPARATOR of MetaData
ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
+
ChunkMetadataReadIterator iterator =
new ChunkMetadataReadIterator(
0,
chunkMetadataTempFile.length(),
new LocalTsFileInput(chunkMetadataTempFile.toPath()));
+ Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
+ Queue<MetadataIndexNode> measurementMetadataIndexQueue = null;
+ String currentDevice = null;
+ String prevDevice = null;
+ MetadataIndexNode currentIndexNode =
+ new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+ int seriesIdxForCurrDevice = 0;
+ BloomFilter filter =
+ BloomFilter.getEmptyBloomFilter(
+ TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
while (iterator.hasNextChunkMetadata()) {
- // 1. read in all chunk metadata of one series
- // 2. construct the timeseries metadata for this series
- // 3. construct the index tree node for the series
- // 4. serialize the timeseries metadata to file
+ // read in all chunk metadata of one series
+ // construct the timeseries metadata for this series
TimeseriesMetadata timeseriesMetadata = readTimeseriesMetadata(iterator);
+ // build bloom filter
+ filter.add(currentSeries);
+ // construct the index tree node for the series
+ currentDevice = new Path(currentSeries).getDevice();
+ if (!currentDevice.equals(prevDevice)) {
+ if (prevDevice != null) {
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ deviceMetadataIndexMap.put(
+ prevDevice,
+ generateRootNode(
+ measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ measurementMetadataIndexQueue = new ArrayDeque<>();
+ seriesIdxForCurrDevice = 0;
+ }
+
+ if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
+ }
+ currentIndexNode.addEntry(
+ new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
+ }
+
+ prevDevice = currentDevice;
+ seriesIdxForCurrDevice++;
+ // serialize the timeseries metadata to file
+ timeseriesMetadata.serializeTo(out.wrapAsStream());
+ }
+
+ MetadataIndexNode metadataIndex = null;
+ // if not exceed the max child nodes num, ignore the device index and directly point to the
+ // measurement
+ if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
+ MetadataIndexNode metadataIndexNode =
+ new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+ for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+ metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+ entry.getValue().serializeTo(out.wrapAsStream());
+ }
+ metadataIndexNode.setEndOffset(out.getPosition());
+ metadataIndex = metadataIndexNode;
+ } else {
+ // else, build level index for devices
+ Queue<MetadataIndexNode> deviceMetadataIndexQueue = new ArrayDeque<>();
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+
+ for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
+ // when constructing from internal node, each node is related to an entry
+ if (currentIndexNode.isFull()) {
+ addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+ currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
+ }
+ currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
+ entry.getValue().serializeTo(out.wrapAsStream());
+ }
+ addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadataIndexQueue, out);
+ MetadataIndexNode deviceMetadataIndexNode =
+ generateRootNode(deviceMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
+ deviceMetadataIndexNode.setEndOffset(out.getPosition());
+ metadataIndex = deviceMetadataIndexNode;
}
+
+ TsFileMetadata tsFileMetadata = new TsFileMetadata();
+ tsFileMetadata.setMetadataIndex(metadataIndex);
+ tsFileMetadata.setMetaOffset(metaOffset);
+
+ int size = tsFileMetadata.serializeTo(out.wrapAsStream());
+ size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
+
+ // write TsFileMetaData size
+ ReadWriteIOUtils.write(size, out.wrapAsStream());
+
+ // write magic string
+ out.write(MAGIC_STRING_BYTES);
+
+ // close file
+ out.close();
+ canWrite = false;
}
+ /**
+ * Read in all the chunk metadata for a series, and construct a TimeseriesMetadata for it
+ *
+ * @param iterator
+ * @return
+ * @throws IOException
+ */
private TimeseriesMetadata readTimeseriesMetadata(ChunkMetadataReadIterator iterator)
throws IOException {
Pair<String, IChunkMetadata> currentPair = iterator.getCurrentPair();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index c34ab1df63..aba72ce2e4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -294,7 +294,9 @@ public class TsFileIOWriter implements AutoCloseable {
}
// write bloom filter
- size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
+ size +=
+ tsFileMetaData.buildAndSerializeBloomFilter(
+ out.wrapAsStream(), chunkMetadataListMap.keySet());
if (logger.isDebugEnabled()) {
logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
}