You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/14 09:05:24 UTC

[iotdb] branch IOTDB-4650 created (now b86bb650ae)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-4650
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at b86bb650ae add read from tail

This branch includes the following new commits:

     new b86bb650ae add read from tail

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add read from tail

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4650
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b86bb650ae121dfaea4a27fc9673502592cbbbbf
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Fri Oct 14 17:05:04 2022 +0800

    add read from tail
---
 .../java/org/apache/iotdb/RewriteTsFileTool.java   | 226 ++++++++++++++++++++-
 1 file changed, 217 insertions(+), 9 deletions(-)

diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index 653f2de67c..f45b3457c3 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -21,9 +21,12 @@ package org.apache.iotdb;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -35,11 +38,16 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
@@ -48,11 +56,18 @@ import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
 import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
 import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
 import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
 import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.apache.commons.cli.CommandLine;
@@ -66,9 +81,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -81,6 +98,7 @@ public class RewriteTsFileTool {
   private static String user = "root";
   private static String password = "root";
   private static String filePath = "";
+  private static String readMode = "s";
 
   private static Map<String, Set<MeasurementSchema>> device2Measurements;
 
@@ -119,6 +137,7 @@ public class RewriteTsFileTool {
       user = getArgOrDefault(commandLine, "u", user);
       password = getArgOrDefault(commandLine, "pw", password);
       filePath = getArgOrDefault(commandLine, "f", filePath);
+      readMode = getArgOrDefault(commandLine, "rm", readMode);
     } catch (ParseException e) {
       System.out.printf("Parse Args Error. %s%n", e.getMessage());
       priHelp(options);
@@ -175,6 +194,15 @@ public class RewriteTsFileTool {
             .required()
             .build();
     options.addOption(filePathOpt);
+
+    Option readModeOpt =
+        Option.builder("rm")
+            .argName("readMode")
+            .hasArg()
+            .desc("Read mode, s(equence) or r(everse)")
+            .required()
+            .build();
+    options.addOption(readModeOpt);
     return options;
   }
 
@@ -207,21 +235,37 @@ public class RewriteTsFileTool {
     List<File> unloadTsFiles = new ArrayList<>();
     System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", size);
     System.out.println("Start Loading TsFiles...");
-    for (int i = 0; i < size; i++) {
-      File file = files.get(i);
-      System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+    if (readMode.equals("s")) {
+      for (int i = 0; i < size; i++) {
+        File file = files.get(i);
+        System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+        try {
+          seqWriteTsFile(file.getPath(), session);
+        } catch (Exception e) {
+          System.out.println(
+              "------------------------------Error Message------------------------------");
+          e.printStackTrace();
+          System.out.println(
+              "------------------------------End Message------------------------------");
+          unloadTsFiles.add(file);
+          continue;
+        }
+        System.out.println("Done");
+      }
+    } else {
       try {
-        writeTsFile(file.getPath(), session);
-      } catch (Exception e) {
+        reverseWriteTsFile(files, session);
+      } catch (IOException
+          | IllegalPathException
+          | IoTDBConnectionException
+          | StatementExecutionException
+          | NoMeasurementException e) {
         System.out.println(
             "------------------------------Error Message------------------------------");
         e.printStackTrace();
         System.out.println(
             "------------------------------End Message------------------------------");
-        unloadTsFiles.add(file);
-        continue;
       }
-      System.out.println("Done");
     }
     System.out.println("Finish Loading TsFiles");
     System.out.printf(
@@ -262,7 +306,7 @@ public class RewriteTsFileTool {
    * @param filename the file path to be loaded
    * @param session IoTDB session
    */
-  public static void writeTsFile(String filename, Session session)
+  public static void seqWriteTsFile(String filename, Session session)
       throws IOException, IllegalPathException, IoTDBConnectionException,
           StatementExecutionException, NoMeasurementException {
     // parse modifications from .mods
@@ -425,4 +469,168 @@ public class RewriteTsFileTool {
           alignedChunkMetadataList, Collections.singletonList(measurementModifications));
     }
   }
+
+  public static void reverseWriteTsFile(List<File> files, Session session)
+      throws IOException, IllegalPathException, IoTDBConnectionException,
+          StatementExecutionException, NoMeasurementException {
+    List<TsFileResource> resources = new ArrayList<>();
+    files.forEach(x -> resources.add(new TsFileResource(x)));
+    try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(resources)) {
+      while (deviceIterator.hasNextDevice()) {
+        Pair<String, Boolean> devicePair = deviceIterator.nextDevice();
+        String device = devicePair.left;
+        boolean isAligned = devicePair.right;
+        if (isAligned) {
+          try {
+            writeAlignedSeries(device, deviceIterator, session);
+          } catch (Throwable t) {
+            // this is a broken aligned chunk, skip it
+            System.out.println("Skip aligned chunk " + device);
+          }
+        } else {
+          MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
+              deviceIterator.iterateNotAlignedSeries(device, true);
+          while (seriesIterator.hasNextSeries()) {
+            writeSingleSeries(device, seriesIterator, session);
+          }
+        }
+      }
+    }
+  }
+
+  protected static void writeSingleSeries(
+      String device, MultiTsFileDeviceIterator.MeasurementIterator seriesIterator, Session session)
+      throws IllegalPathException, IOException, IoTDBConnectionException,
+          StatementExecutionException {
+    PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+    LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
+        seriesIterator.getMetadataListForCurrentSeries();
+    while (!readerAndChunkMetadataList.isEmpty()) {
+      Pair<TsFileSequenceReader, List<ChunkMetadata>> readerMetadataPair =
+          readerAndChunkMetadataList.removeFirst();
+      TsFileSequenceReader reader = readerMetadataPair.left;
+      List<ChunkMetadata> chunkMetadataList = readerMetadataPair.right;
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        try {
+          writeSingleChunk(device, p, chunkMetadata, reader, session);
+        } catch (Throwable t) {
+          // this is a broken chunk, skip it
+          System.out.printf("Skip broken chunk in device %s.%s%n", device, p.getMeasurement());
+        }
+      }
+    }
+  }
+
+  protected static void writeSingleChunk(
+      String device,
+      PartialPath p,
+      ChunkMetadata chunkMetadata,
+      TsFileSequenceReader reader,
+      Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    Chunk chunk = reader.readMemChunk(chunkMetadata);
+    ChunkHeader chunkHeader = chunk.getHeader();
+    MeasurementSchema schema =
+        new MeasurementSchema(
+            p.getMeasurement(),
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType());
+    Tablet tablet = new Tablet(device, Collections.singletonList(schema), 1024);
+    IChunkReader chunkReader = new ChunkReader(chunk, null);
+    while (chunkReader.hasNextSatisfiedPage()) {
+      IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
+      while (batchIterator.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
+        tablet.timestamps[tablet.rowSize] = timeValuePair.getTimestamp();
+        tablet.values[tablet.rowSize++] = timeValuePair.getValue();
+        if (tablet.rowSize >= 1024) {
+          session.insertTablet(tablet);
+          tablet.reset();
+        }
+      }
+    }
+    if (tablet.rowSize > 0) {
+      session.insertTablet(tablet);
+      tablet.reset();
+    }
+  }
+
+  private static List<IMeasurementSchema> collectSchemaFromAlignedChunkMetadataList(
+      LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList)
+      throws IOException {
+    Set<MeasurementSchema> schemaSet = new HashSet<>();
+    Set<String> measurementSet = new HashSet<>();
+    for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair :
+        readerAndChunkMetadataList) {
+      TsFileSequenceReader reader = readerListPair.left;
+      List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right;
+      for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) {
+        List<IChunkMetadata> valueChunkMetadataList =
+            alignedChunkMetadata.getValueChunkMetadataList();
+        for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+          if (chunkMetadata == null) {
+            continue;
+          }
+          if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
+            continue;
+          }
+          measurementSet.add(chunkMetadata.getMeasurementUid());
+          Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata) chunkMetadata);
+          ChunkHeader header = chunk.getHeader();
+          schemaSet.add(
+              new MeasurementSchema(
+                  header.getMeasurementID(),
+                  header.getDataType(),
+                  header.getEncodingType(),
+                  header.getCompressionType()));
+        }
+      }
+    }
+    List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet);
+    schemaList.sort(Comparator.comparing(IMeasurementSchema::getMeasurementId));
+    return schemaList;
+  }
+
+  protected static void writeAlignedSeries(
+      String device, MultiTsFileDeviceIterator deviceIterator, Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
+        deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    List<IMeasurementSchema> iSchemaList =
+        collectSchemaFromAlignedChunkMetadataList(readerAndChunkMetadataList);
+    iSchemaList.forEach(x -> schemaList.add((MeasurementSchema) x));
+    while (readerAndChunkMetadataList.size() > 0) {
+      Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair =
+          readerAndChunkMetadataList.removeFirst();
+      TsFileSequenceReader reader = readerListPair.left;
+      List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right;
+      TsFileAlignedSeriesReaderIterator readerIterator =
+          new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, iSchemaList);
+      while (readerIterator.hasNext()) {
+        Tablet tablet = new Tablet(device, schemaList, 1024);
+        Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
+        AlignedChunkReader alignedChunkReader = chunkReaderAndChunkSize.left;
+        while (alignedChunkReader.hasNextSatisfiedPage()) {
+          IBatchDataIterator batchDataIterator =
+              alignedChunkReader.nextPageData().getBatchDataIterator();
+          while (batchDataIterator.hasNext()) {
+            TsPrimitiveType[] pointsData = (TsPrimitiveType[]) batchDataIterator.currentValue();
+            tablet.timestamps[tablet.rowSize] = batchDataIterator.currentTime();
+            tablet.values[tablet.rowSize++] = batchDataIterator.currentValue();
+            batchDataIterator.next();
+            if (tablet.rowSize >= 1024) {
+              session.insertAlignedTablet(tablet);
+              tablet.reset();
+            }
+          }
+        }
+        if (tablet.rowSize > 0) {
+          session.insertAlignedTablet(tablet);
+          tablet.reset();
+        }
+      }
+    }
+  }
 }