You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2022/01/10 02:11:37 UTC

[iotdb] 01/01: TsFileSketchTool

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch tsfile_split
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cc91eeda55ffe0f249b793b3a9ac281b6d1f1651
Author: Zesong Sun <v-...@microsoft.com>
AuthorDate: Mon Jan 10 10:10:04 2022 +0800

    TsFileSketchTool
---
 .../java/org/apache/iotdb/tsfile/Constant.java     |   1 +
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   |   2 +
 .../apache/iotdb/db/tools/TsFileSketchTool.java    |  12 +-
 .../org/apache/iotdb/db/tools/TsFileSplitTool.java | 180 +++++++++++++++++++++
 4 files changed, 192 insertions(+), 3 deletions(-)

diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java
index 09f4213..62dba14 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java
@@ -28,4 +28,5 @@ public class Constant {
   static final String SENSOR_3 = "sensor_3";
 
   static final String DEVICE_PREFIX = "device_";
+  static final String SENSOR_ = "sensor_";
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 3f04c28..1f8dfeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -126,6 +126,8 @@ public class TsFileRewriteTool implements AutoCloseable {
     }
   }
 
+  public TsFileRewriteTool() {}
+
   /**
    * Rewrite an old file to the latest version
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index 947c1b7..55a1398 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -23,7 +23,13 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.*;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -51,8 +57,8 @@ public class TsFileSketchTool {
 
   public static void main(String[] args) throws IOException {
     Pair<String, String> fileNames = checkArgs(args);
-    String filename = fileNames.left;
-    String outFile = fileNames.right;
+    String filename = "C:\\Users\\v-zesongsun\\Desktop\\test0-0.tsfile"; // fileNames.left;
+    String outFile = "C:\\Users\\v-zesongsun\\Desktop\\test0-0.txt"; // fileNames.right;
     System.out.println("TsFile path:" + filename);
     System.out.println("Sketch save path:" + outFile);
     new TsFileSketchTool(filename, outFile).run();
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java
new file mode 100644
index 0000000..84d114e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TsFileSplitTool extends TsFileRewriteTool {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsFileSplitTool.class);
+
+  private String filename;
+  private TsFileSequenceReader reader;
+
+  /**
+   * If the chunk point num is lower than this threshold, it will be deserialized into points,
+   * default is 100
+   */
+  private final long chunkPointNumLowerBoundInCompaction = 100;
+
+  public static void main(String[] args) throws IOException {
+    String fileName = "C:\\Users\\v-zesongsun\\Desktop\\test0.tsfile"; // args[0];
+    logger.info("Splitting TsFile {} ...", fileName);
+    try {
+      new TsFileSplitTool(fileName).run();
+    } catch (IllegalPathException | PageException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * construct TsFileSketchTool
+   *
+   * @param filename input file path
+   */
+  public TsFileSplitTool(String filename) {
+    super();
+    try {
+      this.filename = filename;
+      this.reader = new TsFileSequenceReader(filename);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /** entry of tool */
+  public void run() throws IOException, IllegalPathException, PageException {
+    Iterator<List<Path>> pathIterator = reader.getPathsIterator();
+    Set<String> devices = new HashSet<>();
+    String fileNamePrefix = filename.substring(0, filename.length() - 7).concat("-");
+    int fileIndex = 0;
+    TsFileIOWriter tsFileIOWriter = null;
+    while (pathIterator.hasNext()) {
+      for (Path path : pathIterator.next()) {
+        String deviceId = path.getDevice();
+        if (devices.add(deviceId)) {
+          if (tsFileIOWriter != null) {
+            // seal last TsFile
+            tsFileIOWriter.endFile();
+          }
+
+          // open a new TsFile
+          tsFileIOWriter =
+              new TsFileIOWriter(
+                  FSFactoryProducer.getFSFactory()
+                      .getFile(fileNamePrefix + fileIndex + TsFileConstant.TSFILE_SUFFIX));
+          fileIndex++;
+        }
+
+        List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(path);
+        assert tsFileIOWriter != null;
+        tsFileIOWriter.startChunkGroup(deviceId);
+        boolean firstChunkInChunkGroup = true;
+
+        for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+          Chunk chunk = reader.readMemChunk(chunkMetadata);
+
+          ChunkHeader chunkHeader = chunk.getHeader();
+          MeasurementSchema measurementSchema =
+              new MeasurementSchema(
+                  chunkHeader.getMeasurementID(),
+                  chunkHeader.getDataType(),
+                  chunkHeader.getEncodingType(),
+                  chunkHeader.getCompressionType());
+          TSDataType dataType = chunkHeader.getDataType();
+          List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+          List<ByteBuffer> dataInChunk = new ArrayList<>();
+          int dataSize = chunkHeader.getDataSize();
+          while (dataSize > 0) {
+            // a new Page
+            PageHeader pageHeader =
+                reader.readPageHeader(
+                    dataType, chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER);
+            boolean needToDecode = pageHeader.getStatistics() == null; // ignore modification
+            ByteBuffer pageData =
+                !needToDecode
+                    ? reader.readCompressedPage(pageHeader)
+                    : reader.readPage(pageHeader, chunkHeader.getCompressionType());
+            pageHeadersInChunk.add(pageHeader);
+            dataInChunk.add(pageData);
+            dataSize -= pageHeader.getSerializedPageSize();
+          }
+          reWriteChunk(
+              deviceId, firstChunkInChunkGroup, measurementSchema, pageHeadersInChunk, dataInChunk);
+        }
+        tsFileIOWriter.endChunkGroup();
+      }
+    }
+    logger.info("TsFile {} is split into {} new files.", filename, devices.size());
+  }
+
+  protected void reWriteChunk(
+      String deviceId,
+      boolean firstChunkInChunkGroup,
+      MeasurementSchema schema,
+      List<PageHeader> pageHeadersInChunk,
+      List<ByteBuffer> pageDataInChunk)
+      throws IOException, PageException {
+    valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+    Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>();
+    for (int i = 0; i < pageDataInChunk.size(); i++) {
+      writePage(schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap);
+    }
+    for (Map.Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) {
+      long partitionId = entry.getKey();
+      TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
+      if (firstChunkInChunkGroup || !tsFileIOWriter.isWritingChunkGroup()) {
+        tsFileIOWriter.startChunkGroup(deviceId);
+      }
+      // write chunks to their own upgraded tsFiles
+      IChunkWriter chunkWriter = entry.getValue();
+      chunkWriter.writeToFileWriter(tsFileIOWriter);
+    }
+  }
+}