You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/12 03:14:33 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-4057]Support aligned timeseries in rewriteTsFileTool (#6972)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 865711ccae [To rel/0.13][IOTDB-4057]Support aligned timeseries in rewriteTsFileTool (#6972)
865711ccae is described below

commit 865711ccaec355686966cdf685dc6b29c9bcf913
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Fri Aug 12 11:14:28 2022 +0800

    [To rel/0.13][IOTDB-4057]Support aligned timeseries in rewriteTsFileTool (#6972)
---
 .../java/org/apache/iotdb/RewriteFileTool.java     | 195 ++++++++++++++++++++-
 .../apache/iotdb/tsfile/write/record/Tablet.java   |  63 +++++++
 2 files changed, 255 insertions(+), 3 deletions(-)

diff --git a/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java b/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
index 6ad579b025..df0805844e 100644
--- a/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
+++ b/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
@@ -43,8 +43,10 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.page.AlignedPageReader;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -55,6 +57,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -195,12 +198,69 @@ public class RewriteFileTool {
       byte marker;
       String curDevice = null;
       long chunkHeaderOffset = -1;
+
+      // Used for rewriting aligned chunk group
+      List<PageHeader> timePageHeaders = new ArrayList<>();
+      List<ByteBuffer> timePageDatas = new ArrayList<>();
+      List<List<PageHeader>> valuePageHeadersList = new ArrayList<>();
+      List<List<ByteBuffer>> valuePageDatasList = new ArrayList<>();
+      List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+      List<List<TimeRange>> deleteIntervalsList = new ArrayList<>();
+
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
+          case MetaMarker.TIME_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+            ChunkHeader header = reader.readChunkHeader(marker);
+            int dataSize = header.getDataSize();
+            while (dataSize > 0) {
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              timePageHeaders.add(pageHeader);
+              timePageDatas.add(pageData);
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            break;
+          case MetaMarker.VALUE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+            chunkHeaderOffset = reader.position() - 1;
+            header = reader.readChunkHeader(marker);
+            MeasurementSchema measurementSchema =
+                new MeasurementSchema(
+                    header.getMeasurementID(),
+                    header.getDataType(),
+                    header.getEncodingType(),
+                    header.getCompressionType());
+            measurementSchemas.add(measurementSchema);
+
+            // read delete time range from old modification file
+            deleteIntervalsList.add(
+                getOldSortedDeleteIntervals(
+                    curDevice, measurementSchema, chunkHeaderOffset, modifications));
+
+            dataSize = header.getDataSize();
+            int pageIndex = 0;
+            while (dataSize > 0) {
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              if (valuePageHeadersList.size() == pageIndex) {
+                valuePageHeadersList.add(new ArrayList<>());
+                valuePageDatasList.add(new ArrayList<>());
+              }
+              valuePageHeadersList.get(pageIndex).add(pageHeader);
+              valuePageDatasList.get(pageIndex).add(pageData);
+              pageIndex++;
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            break;
           case MetaMarker.CHUNK_HEADER:
           case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
             chunkHeaderOffset = reader.position() - 1;
-            ChunkHeader header = reader.readChunkHeader(marker);
+            header = reader.readChunkHeader(marker);
             Decoder defaultTimeDecoder =
                 Decoder.getDecoderByType(
                     TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
@@ -209,14 +269,14 @@ public class RewriteFileTool {
                 Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
             // 1. construct MeasurementSchema from chunkHeader
             String measurement = header.getMeasurementID();
-            MeasurementSchema measurementSchema =
+            measurementSchema =
                 new MeasurementSchema(
                     measurement,
                     header.getDataType(),
                     header.getEncodingType(),
                     header.getCompressionType());
             // 2. record data point of each measurement
-            int dataSize = header.getDataSize();
+            dataSize = header.getDataSize();
             while (dataSize > 0) {
               valueDecoder.reset();
               PageHeader pageHeader =
@@ -282,10 +342,32 @@ public class RewriteFileTool {
             }
             break;
           case MetaMarker.CHUNK_GROUP_HEADER:
+            if (!timePageHeaders.isEmpty()) {
+              rewriteAlignedChunkGroup(
+                  timePageHeaders,
+                  timePageDatas,
+                  valuePageHeadersList,
+                  valuePageDatasList,
+                  measurementSchemas,
+                  deleteIntervalsList,
+                  curDevice,
+                  session);
+            }
             ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
             curDevice = chunkGroupHeader.getDeviceID();
             break;
           case MetaMarker.OPERATION_INDEX_RANGE:
+            if (!timePageHeaders.isEmpty()) {
+              rewriteAlignedChunkGroup(
+                  timePageHeaders,
+                  timePageDatas,
+                  valuePageHeadersList,
+                  valuePageDatasList,
+                  measurementSchemas,
+                  deleteIntervalsList,
+                  curDevice,
+                  session);
+            }
             reader.readPlanIndex();
             reader.getMinPlanIndex();
             reader.getMaxPlanIndex();
@@ -294,6 +376,17 @@ public class RewriteFileTool {
             MetaMarker.handleUnexpectedMarker(marker);
         }
       }
+      if (!timePageHeaders.isEmpty()) {
+        rewriteAlignedChunkGroup(
+            timePageHeaders,
+            timePageDatas,
+            valuePageHeadersList,
+            valuePageDatasList,
+            measurementSchemas,
+            deleteIntervalsList,
+            curDevice,
+            session);
+      }
     } catch (IllegalPathException
         | IOException
         | IoTDBConnectionException
@@ -304,6 +397,102 @@ public class RewriteFileTool {
     }
   }
 
+  private static void rewriteAlignedChunkGroup(
+      List<PageHeader> timePageHeaders,
+      List<ByteBuffer> timePageDatas,
+      List<List<PageHeader>> valuePageHeadersList,
+      List<List<ByteBuffer>> valuePageDatasList,
+      List<MeasurementSchema> measurementSchemas,
+      List<List<TimeRange>> deleteIntervalsList,
+      String curDevice,
+      Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    Decoder defaultTimeDecoder =
+        Decoder.getDecoderByType(
+            TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+            TSDataType.INT64);
+
+    List<Decoder> valueDecoders = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (MeasurementSchema measurementSchema : measurementSchemas) {
+      Decoder valueDecoder =
+          Decoder.getDecoderByType(
+              measurementSchema.getEncodingType(), measurementSchema.getType());
+      valueDecoders.add(valueDecoder);
+      dataTypes.add(measurementSchema.getType());
+    }
+
+    for (int pageIndex = 0; pageIndex < timePageHeaders.size(); pageIndex++) {
+      AlignedPageReader alignedPageReader =
+          new AlignedPageReader(
+              timePageHeaders.get(pageIndex),
+              timePageDatas.get(pageIndex),
+              defaultTimeDecoder,
+              valuePageHeadersList.get(pageIndex),
+              valuePageDatasList.get(pageIndex),
+              dataTypes,
+              valueDecoders,
+              null);
+
+      alignedPageReader.setDeleteIntervalList(deleteIntervalsList);
+      BatchData batchData = alignedPageReader.getAllSatisfiedPageData();
+      int maxRow = batchData.length();
+      Tablet tablet = new Tablet(curDevice, measurementSchemas, maxRow);
+
+      long curTabletSize = 0;
+      while (batchData.hasCurrent()) {
+        tablet.addTimestamp(tablet.rowSize, batchData.currentTime());
+        tablet.addValues(tablet.rowSize, (TsPrimitiveType[]) batchData.currentValue());
+        tablet.rowSize++;
+        // calculate curTabletSize based on timestamp and value
+        curTabletSize += 8;
+        for (TSDataType dataType : dataTypes) {
+          switch (dataType) {
+            case BOOLEAN:
+              curTabletSize += 1;
+              break;
+            case INT32:
+            case FLOAT:
+              curTabletSize += 4;
+              break;
+            case INT64:
+            case DOUBLE:
+              curTabletSize += 8;
+              break;
+            case TEXT:
+              curTabletSize += 4 + ((Binary) batchData.currentValue()).getLength();
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", dataType));
+          }
+        }
+
+        if (curTabletSize >= MAX_TABLET_SIZE) {
+          session.insertAlignedTablet(tablet);
+          curTabletSize = 0;
+          tablet.reset();
+        }
+        batchData.next();
+      }
+      if (tablet.rowSize > 0) {
+        session.insertAlignedTablet(tablet);
+      }
+
+      for (Decoder decoder : valueDecoders) {
+        decoder.reset();
+      }
+      defaultTimeDecoder.reset();
+    }
+
+    timePageHeaders.clear();
+    timePageDatas.clear();
+    valuePageHeadersList.clear();
+    valuePageDatasList.clear();
+    measurementSchemas.clear();
+    deleteIntervalsList.clear();
+  }
+
   private static boolean checkArgs(String[] args) {
     String paramConfig =
         "-b=[path of backUp directory] -vf=[path of validation file]/-f=[path of tsfile list] -o=[path of output log] -u=[username, default=\"root\"] -pw=[password, default=\"root\"]";
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index bb66da22a9..72e055da5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
@@ -121,12 +122,74 @@ public class Tablet {
     timestamps[rowIndex] = timestamp;
   }
 
+  public void addValues(int rowIndex, TsPrimitiveType[] values) {
+    for (int i = 0; i < schemas.size(); i++) {
+      MeasurementSchema measurementSchema = schemas.get(i);
+      addValueOfDataType(measurementSchema.getType(), rowIndex, i, values[i]);
+    }
+  }
+
   public void addValue(String measurementId, int rowIndex, Object value) {
     int indexOfSchema = measurementIndex.get(measurementId);
     MeasurementSchema measurementSchema = schemas.get(indexOfSchema);
     addValueOfDataType(measurementSchema.getType(), rowIndex, indexOfSchema, value);
   }
 
+  private void addValueOfDataType(
+      TSDataType dataType, int rowIndex, int indexOfSchema, TsPrimitiveType value) {
+    if (value == null) {
+      // init the bitMap to mark null value
+      if (bitMaps == null) {
+        bitMaps = new BitMap[values.length];
+      }
+      if (bitMaps[indexOfSchema] == null) {
+        bitMaps[indexOfSchema] = new BitMap(maxRowNumber);
+      }
+      // mark the null value position
+      bitMaps[indexOfSchema].mark(rowIndex);
+    }
+    switch (dataType) {
+      case TEXT:
+        {
+          Binary[] sensor = (Binary[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? value.getBinary() : Binary.EMPTY_VALUE;
+          break;
+        }
+      case FLOAT:
+        {
+          float[] sensor = (float[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? value.getFloat() : Float.MIN_VALUE;
+          break;
+        }
+      case INT32:
+        {
+          int[] sensor = (int[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? value.getInt() : Integer.MIN_VALUE;
+          break;
+        }
+      case INT64:
+        {
+          long[] sensor = (long[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? value.getLong() : Long.MIN_VALUE;
+          break;
+        }
+      case DOUBLE:
+        {
+          double[] sensor = (double[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null ? value.getDouble() : Double.MIN_VALUE;
+          break;
+        }
+      case BOOLEAN:
+        {
+          boolean[] sensor = (boolean[]) values[indexOfSchema];
+          sensor[rowIndex] = value != null && value.getBoolean();
+          break;
+        }
+      default:
+        throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
+    }
+  }
+
   private void addValueOfDataType(
       TSDataType dataType, int rowIndex, int indexOfSchema, Object value) {