You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2022/01/10 02:11:37 UTC
[iotdb] 01/01: TsFileSketchTool
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch tsfile_split
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cc91eeda55ffe0f249b793b3a9ac281b6d1f1651
Author: Zesong Sun <v-...@microsoft.com>
AuthorDate: Mon Jan 10 10:10:04 2022 +0800
TsFileSketchTool
---
.../java/org/apache/iotdb/tsfile/Constant.java | 1 +
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 2 +
.../apache/iotdb/db/tools/TsFileSketchTool.java | 12 +-
.../org/apache/iotdb/db/tools/TsFileSplitTool.java | 180 +++++++++++++++++++++
4 files changed, 192 insertions(+), 3 deletions(-)
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java
index 09f4213..62dba14 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java
@@ -28,4 +28,5 @@ public class Constant {
static final String SENSOR_3 = "sensor_3";
static final String DEVICE_PREFIX = "device_";
+ static final String SENSOR_ = "sensor_";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 3f04c28..1f8dfeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -126,6 +126,8 @@ public class TsFileRewriteTool implements AutoCloseable {
}
}
+ public TsFileRewriteTool() {}
+
/**
* Rewrite an old file to the latest version
*
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index 947c1b7..55a1398 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -23,7 +23,13 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -51,8 +57,8 @@ public class TsFileSketchTool {
public static void main(String[] args) throws IOException {
Pair<String, String> fileNames = checkArgs(args);
- String filename = fileNames.left;
- String outFile = fileNames.right;
+ String filename = "C:\\Users\\v-zesongsun\\Desktop\\test0-0.tsfile"; // fileNames.left;
+ String outFile = "C:\\Users\\v-zesongsun\\Desktop\\test0-0.txt"; // fileNames.right;
System.out.println("TsFile path:" + filename);
System.out.println("Sketch save path:" + outFile);
new TsFileSketchTool(filename, outFile).run();
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java
new file mode 100644
index 0000000..84d114e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TsFileSplitTool extends TsFileRewriteTool {
+
+ private static final Logger logger = LoggerFactory.getLogger(TsFileSplitTool.class);
+
+ private String filename;
+ private TsFileSequenceReader reader;
+
+ /**
+ * If the chunk point num is lower than this threshold, it will be deserialized into points,
+ * default is 100
+ */
+ private final long chunkPointNumLowerBoundInCompaction = 100;
+
+ public static void main(String[] args) throws IOException {
+ String fileName = "C:\\Users\\v-zesongsun\\Desktop\\test0.tsfile"; // args[0];
+ logger.info("Splitting TsFile {} ...", fileName);
+ try {
+ new TsFileSplitTool(fileName).run();
+ } catch (IllegalPathException | PageException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * construct TsFileSketchTool
+ *
+ * @param filename input file path
+ */
+ public TsFileSplitTool(String filename) {
+ super();
+ try {
+ this.filename = filename;
+ this.reader = new TsFileSequenceReader(filename);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /** entry of tool */
+ public void run() throws IOException, IllegalPathException, PageException {
+ Iterator<List<Path>> pathIterator = reader.getPathsIterator();
+ Set<String> devices = new HashSet<>();
+ String fileNamePrefix = filename.substring(0, filename.length() - 7).concat("-");
+ int fileIndex = 0;
+ TsFileIOWriter tsFileIOWriter = null;
+ while (pathIterator.hasNext()) {
+ for (Path path : pathIterator.next()) {
+ String deviceId = path.getDevice();
+ if (devices.add(deviceId)) {
+ if (tsFileIOWriter != null) {
+ // seal last TsFile
+ tsFileIOWriter.endFile();
+ }
+
+ // open a new TsFile
+ tsFileIOWriter =
+ new TsFileIOWriter(
+ FSFactoryProducer.getFSFactory()
+ .getFile(fileNamePrefix + fileIndex + TsFileConstant.TSFILE_SUFFIX));
+ fileIndex++;
+ }
+
+ List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(path);
+ assert tsFileIOWriter != null;
+ tsFileIOWriter.startChunkGroup(deviceId);
+ boolean firstChunkInChunkGroup = true;
+
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+
+ ChunkHeader chunkHeader = chunk.getHeader();
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ chunkHeader.getMeasurementID(),
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+ TSDataType dataType = chunkHeader.getDataType();
+ List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+ List<ByteBuffer> dataInChunk = new ArrayList<>();
+ int dataSize = chunkHeader.getDataSize();
+ while (dataSize > 0) {
+ // a new Page
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ dataType, chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER);
+ boolean needToDecode = pageHeader.getStatistics() == null; // ignore modification
+ ByteBuffer pageData =
+ !needToDecode
+ ? reader.readCompressedPage(pageHeader)
+ : reader.readPage(pageHeader, chunkHeader.getCompressionType());
+ pageHeadersInChunk.add(pageHeader);
+ dataInChunk.add(pageData);
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ reWriteChunk(
+ deviceId, firstChunkInChunkGroup, measurementSchema, pageHeadersInChunk, dataInChunk);
+ }
+ tsFileIOWriter.endChunkGroup();
+ }
+ }
+ logger.info("TsFile {} is split into {} new files.", filename, devices.size());
+ }
+
+ protected void reWriteChunk(
+ String deviceId,
+ boolean firstChunkInChunkGroup,
+ MeasurementSchema schema,
+ List<PageHeader> pageHeadersInChunk,
+ List<ByteBuffer> pageDataInChunk)
+ throws IOException, PageException {
+ valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>();
+ for (int i = 0; i < pageDataInChunk.size(); i++) {
+ writePage(schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap);
+ }
+ for (Map.Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) {
+ long partitionId = entry.getKey();
+ TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
+ if (firstChunkInChunkGroup || !tsFileIOWriter.isWritingChunkGroup()) {
+ tsFileIOWriter.startChunkGroup(deviceId);
+ }
+ // write chunks to their own upgraded tsFiles
+ IChunkWriter chunkWriter = entry.getValue();
+ chunkWriter.writeToFileWriter(tsFileIOWriter);
+ }
+ }
+}