You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/17 06:49:11 UTC

[iotdb] branch master updated: [IOTDB-4650] Support starting reading from tail in RewriteTsFileTool (#7604)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eaa59103c [IOTDB-4650] Support starting reading from tail in RewriteTsFileTool (#7604)
4eaa59103c is described below

commit 4eaa59103c2ad877611eade6f68673a185837aa4
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Mon Oct 17 14:49:04 2022 +0800

    [IOTDB-4650] Support starting reading from tail in RewriteTsFileTool (#7604)
---
 .../java/org/apache/iotdb/RewriteTsFileTool.java   | 249 ++++++++++++++++++++-
 1 file changed, 240 insertions(+), 9 deletions(-)

diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index 653f2de67c..4b1d47976d 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -21,9 +21,12 @@ package org.apache.iotdb;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -35,11 +38,16 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
@@ -48,11 +56,18 @@ import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
 import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
 import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
 import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
 import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.cli.CommandLine;
@@ -66,9 +81,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -81,6 +98,7 @@ public class RewriteTsFileTool {
   private static String user = "root";
   private static String password = "root";
   private static String filePath = "";
+  private static String readMode = "s";
 
   private static Map<String, Set<MeasurementSchema>> device2Measurements;
 
@@ -119,6 +137,7 @@ public class RewriteTsFileTool {
       user = getArgOrDefault(commandLine, "u", user);
       password = getArgOrDefault(commandLine, "pw", password);
       filePath = getArgOrDefault(commandLine, "f", filePath);
+      readMode = getArgOrDefault(commandLine, "rm", readMode);
     } catch (ParseException e) {
       System.out.printf("Parse Args Error. %s%n", e.getMessage());
       priHelp(options);
@@ -175,6 +194,15 @@ public class RewriteTsFileTool {
             .required()
             .build();
     options.addOption(filePathOpt);
+
+    Option readModeOpt =
+        Option.builder("rm")
+            .argName("readMode")
+            .hasArg()
+            .desc("Read mode, s(equence) or r(everse)")
+            .required()
+            .build();
+    options.addOption(readModeOpt);
     return options;
   }
 
@@ -207,21 +235,37 @@ public class RewriteTsFileTool {
     List<File> unloadTsFiles = new ArrayList<>();
     System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", size);
     System.out.println("Start Loading TsFiles...");
-    for (int i = 0; i < size; i++) {
-      File file = files.get(i);
-      System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+    if (readMode.equals("s")) {
+      for (int i = 0; i < size; i++) {
+        File file = files.get(i);
+        System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+        try {
+          seqWriteTsFile(file.getPath(), session);
+        } catch (Exception e) {
+          System.out.println(
+              "------------------------------Error Message------------------------------");
+          e.printStackTrace();
+          System.out.println(
+              "------------------------------End Message------------------------------");
+          unloadTsFiles.add(file);
+          continue;
+        }
+        System.out.println("Done");
+      }
+    } else {
       try {
-        writeTsFile(file.getPath(), session);
-      } catch (Exception e) {
+        reverseWriteTsFile(files, session);
+      } catch (IOException
+          | IllegalPathException
+          | IoTDBConnectionException
+          | StatementExecutionException
+          | NoMeasurementException e) {
         System.out.println(
             "------------------------------Error Message------------------------------");
         e.printStackTrace();
         System.out.println(
             "------------------------------End Message------------------------------");
-        unloadTsFiles.add(file);
-        continue;
       }
-      System.out.println("Done");
     }
     System.out.println("Finish Loading TsFiles");
     System.out.printf(
@@ -262,7 +306,7 @@ public class RewriteTsFileTool {
    * @param filename the file path to be loaded
    * @param session IoTDB session
    */
-  public static void writeTsFile(String filename, Session session)
+  public static void seqWriteTsFile(String filename, Session session)
       throws IOException, IllegalPathException, IoTDBConnectionException,
           StatementExecutionException, NoMeasurementException {
     // parse modifications from .mods
@@ -425,4 +469,191 @@ public class RewriteTsFileTool {
           alignedChunkMetadataList, Collections.singletonList(measurementModifications));
     }
   }
+
+  /**
+   * Read the chunk metadata first, then read the chunk according to chunk metadata
+   *
+   * @param files
+   * @param session
+   * @throws IOException
+   * @throws IllegalPathException
+   * @throws IoTDBConnectionException
+   * @throws StatementExecutionException
+   * @throws NoMeasurementException
+   */
+  public static void reverseWriteTsFile(List<File> files, Session session)
+      throws IOException, IllegalPathException, IoTDBConnectionException,
+          StatementExecutionException, NoMeasurementException {
+    List<TsFileResource> resources = new ArrayList<>();
+    files.forEach(x -> resources.add(new TsFileResource(x)));
+    try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(resources)) {
+      while (deviceIterator.hasNextDevice()) {
+        Pair<String, Boolean> devicePair = deviceIterator.nextDevice();
+        String device = devicePair.left;
+        boolean isAligned = devicePair.right;
+        if (isAligned) {
+          try {
+            writeAlignedSeries(device, deviceIterator, session);
+          } catch (Throwable t) {
+            // this is a broken aligned chunk, skip it
+            System.out.println("Skip aligned chunk " + device);
+          }
+        } else {
+          MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
+              deviceIterator.iterateNotAlignedSeries(device, true);
+          while (seriesIterator.hasNextSeries()) {
+            writeSingleSeries(device, seriesIterator, session);
+          }
+        }
+      }
+    }
+  }
+
+  /** Read data from tsfile and write it to IoTDB for a single not aligned series. */
+  protected static void writeSingleSeries(
+      String device, MultiTsFileDeviceIterator.MeasurementIterator seriesIterator, Session session)
+      throws IllegalPathException {
+    PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+    LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
+        seriesIterator.getMetadataListForCurrentSeries();
+    while (!readerAndChunkMetadataList.isEmpty()) {
+      Pair<TsFileSequenceReader, List<ChunkMetadata>> readerMetadataPair =
+          readerAndChunkMetadataList.removeFirst();
+      TsFileSequenceReader reader = readerMetadataPair.left;
+      List<ChunkMetadata> chunkMetadataList = readerMetadataPair.right;
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        try {
+          writeSingleChunk(device, p, chunkMetadata, reader, session);
+        } catch (Throwable t) {
+          // this is a broken chunk, skip it
+          System.out.printf("Skip broken chunk in device %s.%s%n", device, p.getMeasurement());
+        }
+      }
+    }
+  }
+
+  /** Read and write a single chunk for not aligned series. */
+  protected static void writeSingleChunk(
+      String device,
+      PartialPath p,
+      ChunkMetadata chunkMetadata,
+      TsFileSequenceReader reader,
+      Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    Chunk chunk = reader.readMemChunk(chunkMetadata);
+    ChunkHeader chunkHeader = chunk.getHeader();
+    MeasurementSchema schema =
+        new MeasurementSchema(
+            p.getMeasurement(),
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType());
+    Tablet tablet = new Tablet(device, Collections.singletonList(schema), MAX_TABLET_LENGTH);
+    IChunkReader chunkReader = new ChunkReader(chunk, null);
+    while (chunkReader.hasNextSatisfiedPage()) {
+      IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
+      while (batchIterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
+        tablet.timestamps[tablet.rowSize] = timeValuePair.getTimestamp();
+        tablet.values[tablet.rowSize++] = timeValuePair.getValue();
+        if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+          session.insertTablet(tablet);
+          tablet.reset();
+        }
+      }
+    }
+    if (tablet.rowSize > 0) {
+      session.insertTablet(tablet);
+      tablet.reset();
+    }
+  }
+
+  /** Collect the schema list for an aligned device. */
+  private static List<IMeasurementSchema> collectSchemaFromAlignedChunkMetadataList(
+      LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList)
+      throws IOException {
+    Set<MeasurementSchema> schemaSet = new HashSet<>();
+    Set<String> measurementSet = new HashSet<>();
+    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair :
+        readerAndChunkMetadataList) {
+      TsFileSequenceReader reader = readerListPair.left;
+      List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right;
+      for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) {
+        List<IChunkMetadata> valueChunkMetadataList =
+            alignedChunkMetadata.getValueChunkMetadataList();
+        for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+          if (chunkMetadata == null) {
+            continue;
+          }
+          if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
+            continue;
+          }
+          measurementSet.add(chunkMetadata.getMeasurementUid());
+          Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata) chunkMetadata);
+          ChunkHeader header = chunk.getHeader();
+          schemaSet.add(
+              new MeasurementSchema(
+                  header.getMeasurementID(),
+                  header.getDataType(),
+                  header.getEncodingType(),
+                  header.getCompressionType()));
+        }
+      }
+    }
+    List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet);
+    schemaList.sort(Comparator.comparing(IMeasurementSchema::getMeasurementId));
+    return schemaList;
+  }
+
+  /** Read and write an aligned series. */
+  protected static void writeAlignedSeries(
+      String device, MultiTsFileDeviceIterator deviceIterator, Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
+        deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    List<IMeasurementSchema> iSchemaList =
+        collectSchemaFromAlignedChunkMetadataList(readerAndChunkMetadataList);
+    iSchemaList.forEach(x -> schemaList.add((MeasurementSchema) x));
+    while (readerAndChunkMetadataList.size() > 0) {
+      Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair =
+          readerAndChunkMetadataList.removeFirst();
+      TsFileSequenceReader reader = readerListPair.left;
+      List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right;
+      TsFileAlignedSeriesReaderIterator readerIterator =
+          new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, iSchemaList);
+      writeAlignedChunk(readerIterator, device, schemaList, session);
+    }
+  }
+
+  private static void writeAlignedChunk(
+      TsFileAlignedSeriesReaderIterator readerIterator,
+      String device,
+      List<MeasurementSchema> schemaList,
+      Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    while (readerIterator.hasNext()) {
+      Tablet tablet = new Tablet(device, schemaList, MAX_TABLET_LENGTH);
+      Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
+      AlignedChunkReader alignedChunkReader = chunkReaderAndChunkSize.left;
+      while (alignedChunkReader.hasNextSatisfiedPage()) {
+        IBatchDataIterator batchDataIterator =
+            alignedChunkReader.nextPageData().getBatchDataIterator();
+        while (batchDataIterator.hasNext()) {
+          TsPrimitiveType[] pointsData = (TsPrimitiveType[]) batchDataIterator.currentValue();
+          tablet.timestamps[tablet.rowSize] = batchDataIterator.currentTime();
+          tablet.values[tablet.rowSize++] = batchDataIterator.currentValue();
+          batchDataIterator.next();
+          if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+            session.insertAlignedTablet(tablet);
+            tablet.reset();
+          }
+        }
+      }
+      if (tablet.rowSize > 0) {
+        session.insertAlignedTablet(tablet);
+        tablet.reset();
+      }
+    }
+  }
 }