You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/20 01:49:49 UTC

[iotdb] branch IOTDB-4693 created (now 6667567e44)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch IOTDB-4693
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 6667567e44 refactor rewrite tool

This branch includes the following new commits:

     new 6667567e44 refactor rewrite tool

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: refactor rewrite tool

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-4693
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6667567e442268924fb8c29595aec373df392bc4
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Oct 20 09:49:32 2022 +0800

    refactor rewrite tool
---
 .../java/org/apache/iotdb/RewriteTsFileTool.java   | 416 +++++++++++++--------
 1 file changed, 254 insertions(+), 162 deletions(-)

diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index 4b1d47976d..c2cc99d0e9 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -27,42 +27,38 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
 import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
+import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
-import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
-import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -79,12 +75,13 @@ import org.apache.commons.cli.ParseException;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -99,8 +96,7 @@ public class RewriteTsFileTool {
   private static String password = "root";
   private static String filePath = "";
   private static String readMode = "s";
-
-  private static Map<String, Set<MeasurementSchema>> device2Measurements;
+  private static boolean ignoreBrokenChunk = false;
 
   public static void main(String[] args) {
     Session session = null;
@@ -138,6 +134,7 @@ public class RewriteTsFileTool {
       password = getArgOrDefault(commandLine, "pw", password);
       filePath = getArgOrDefault(commandLine, "f", filePath);
       readMode = getArgOrDefault(commandLine, "rm", readMode);
+      ignoreBrokenChunk = commandLine.hasOption("ignore-broken");
     } catch (ParseException e) {
       System.out.printf("Parse Args Error. %s%n", e.getMessage());
       priHelp(options);
@@ -203,6 +200,15 @@ public class RewriteTsFileTool {
             .required()
             .build();
     options.addOption(readModeOpt);
+
+    Option ignoreBrokenChunkOpt =
+        Option.builder("ignore-broken")
+            .argName("IgnoreBrokenChunks")
+            .hasArg()
+            .desc("Ignore the broken chunks in the tsfile")
+            .type(boolean.class)
+            .build();
+    options.addOption(ignoreBrokenChunkOpt);
     return options;
   }
 
@@ -231,27 +237,10 @@ public class RewriteTsFileTool {
    */
   public static void writeToIoTDB(List<File> files, Session session) {
     sortTsFiles(files);
-    int size = files.size();
-    List<File> unloadTsFiles = new ArrayList<>();
-    System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", size);
+    System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", files.size());
     System.out.println("Start Loading TsFiles...");
     if (readMode.equals("s")) {
-      for (int i = 0; i < size; i++) {
-        File file = files.get(i);
-        System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
-        try {
-          seqWriteTsFile(file.getPath(), session);
-        } catch (Exception e) {
-          System.out.println(
-              "------------------------------Error Message------------------------------");
-          e.printStackTrace();
-          System.out.println(
-              "------------------------------End Message------------------------------");
-          unloadTsFiles.add(file);
-          continue;
-        }
-        System.out.println("Done");
-      }
+      writeTsFileSequentially(files, session);
     } else {
       try {
         reverseWriteTsFile(files, session);
@@ -268,6 +257,27 @@ public class RewriteTsFileTool {
       }
     }
     System.out.println("Finish Loading TsFiles");
+  }
+
+  private static void writeTsFileSequentially(List<File> files, Session session) {
+    int size = files.size();
+    List<File> unloadTsFiles = new ArrayList<>();
+    for (int i = 0; i < files.size(); i++) {
+      File file = files.get(i);
+      System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+      try {
+        seqWriteSingleTsFile(file.getPath(), session);
+      } catch (Exception e) {
+        System.out.println(
+            "------------------------------Error Message------------------------------");
+        e.printStackTrace();
+        System.out.println(
+            "------------------------------End Message------------------------------");
+        unloadTsFiles.add(file);
+        continue;
+      }
+      System.out.println("Done");
+    }
     System.out.printf(
         "Load %d TsFiles successfully, %d TsFiles not loaded.%n",
         size - unloadTsFiles.size(), unloadTsFiles.size());
@@ -306,104 +316,31 @@ public class RewriteTsFileTool {
    * @param filename the file path to be loaded
    * @param session IoTDB session
    */
-  public static void seqWriteTsFile(String filename, Session session)
+  public static void seqWriteSingleTsFile(String filename, Session session)
       throws IOException, IllegalPathException, IoTDBConnectionException,
           StatementExecutionException, NoMeasurementException {
-    // parse modifications from .mods
-    List<Modification> modifications = null;
-    if (FSFactoryProducer.getFSFactory()
-        .getFile(filename + ModificationFile.FILE_SUFFIX)
-        .exists()) {
-      modifications =
-          (List<Modification>)
-              new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
-    }
-
-    // read all device and their measurements
-    parseDeviceFromTsFile(filename);
 
     try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
-      for (Map.Entry<String, Set<MeasurementSchema>> entry : device2Measurements.entrySet()) {
-        // collect measurements for device
-        boolean isAligned = false;
-        String curDevice = entry.getKey();
-        List<MeasurementSchema> measurementSchemas = new ArrayList<>();
-        ArrayList<Path> paths = new ArrayList<>();
-        for (MeasurementSchema measurementSchema : entry.getValue()) {
-          if (!measurementSchema.getType().equals(TSDataType.VECTOR)) {
-            measurementSchemas.add(measurementSchema);
-          } else {
-            isAligned = true;
-          }
-        }
-        for (MeasurementSchema measurementSchema : measurementSchemas) {
-          paths.add(new Path(curDevice, measurementSchema.getMeasurementId()));
-        }
-
-        // construct query to this tsfile
-        List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
-        List<TSDataType> dataTypes = new ArrayList<>();
-        IMetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(reader);
-        IChunkLoader chunkLoader = new CachedChunkLoaderImpl(reader);
-        for (Path path : paths) {
-          List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
-          modifyChunkMetadata(isAligned, path, chunkMetadataList, modifications);
-          AbstractFileSeriesReader seriesReader;
-          if (chunkMetadataList.isEmpty()) {
-            seriesReader = new EmptyFileSeriesReader();
-            dataTypes.add(metadataQuerier.getDataType(path));
-          } else {
-            seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
-            dataTypes.add(chunkMetadataList.get(0).getDataType());
-          }
-          readersOfSelectedSeries.add(seriesReader);
-        }
-
-        // read data from tsfile and construct session to send to IoTDB
-        QueryDataSet dataSet =
-            new DataSetWithoutTimeGenerator(paths, dataTypes, readersOfSelectedSeries);
-        Tablet tablet = new Tablet(curDevice, measurementSchemas, MAX_TABLET_LENGTH);
-        tablet.initBitMaps();
-        int measurementSize = measurementSchemas.size();
-        while (dataSet.hasNext()) {
-          RowRecord rowRecord = dataSet.next();
-          tablet.addTimestamp(tablet.rowSize, rowRecord.getTimestamp());
-          for (int i = 0; i < measurementSize; i++) {
-            Field field = rowRecord.getFields().get(i);
-            if (field == null) {
-              tablet.bitMaps[i].mark(tablet.rowSize);
-            } else {
-              tablet.addValue(
-                  measurementSchemas.get(i).getMeasurementId(),
-                  tablet.rowSize,
-                  field.getObjectValue(field.getDataType()));
-            }
-          }
-          tablet.rowSize++;
-          if (tablet.rowSize == MAX_TABLET_LENGTH) {
-            if (isAligned) {
-              session.insertAlignedTablet(tablet);
-            } else {
-              session.insertTablet(tablet);
-            }
-            tablet.reset();
-          }
-        }
-        if (isAligned) {
-          session.insertAlignedTablet(tablet);
-        } else {
-          session.insertTablet(tablet);
+      if (!ignoreBrokenChunk) {
+        long status = reader.selfCheck(new HashMap<>(), new ArrayList<>(), true);
+        if (status == TsFileCheckStatus.INCOMPATIBLE_FILE
+            || status == TsFileCheckStatus.FILE_EXISTS_MISTAKES) {
+          throw new IOException(
+              String.format(
+                  "The file %s is incompatible, cannot rewrite it to IoTDB. If you want to rewrite "
+                      + "all the good chunks in the file, retry with option -ignore-broken.",
+                  filename));
         }
       }
-    }
-  }
-
-  private static void parseDeviceFromTsFile(String filename) throws IOException {
-    device2Measurements = new HashMap<>();
-    try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
       reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
-      String curDevice = null;
+      List<long[]> timeBatch = new ArrayList<>();
+      int pageIndex = 0;
       byte marker;
+      String currentDevice = null;
+      boolean isAlignedChunk = false;
+      List<List<TsPrimitiveType>> valueForAlignedSeries = new ArrayList<>();
+      List<Long> timeForAlignedSeries = new ArrayList<>();
+      List<MeasurementSchema> schemaForAlignedSeries = new ArrayList<>();
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
           case MetaMarker.CHUNK_HEADER:
@@ -413,60 +350,215 @@ public class RewriteTsFileTool {
           case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
           case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
             ChunkHeader header = reader.readChunkHeader(marker);
-            MeasurementSchema measurementSchema =
-                new MeasurementSchema(
-                    header.getMeasurementID(),
-                    header.getDataType(),
-                    header.getEncodingType(),
-                    header.getCompressionType());
-            device2Measurements
-                .computeIfAbsent(curDevice, o -> new HashSet<>())
-                .add(measurementSchema);
-            reader.position(reader.position() + header.getDataSize());
+            if (header.getDataSize() == 0) {
+              // empty value chunk
+              break;
+            }
+            Decoder defaultTimeDecoder =
+                Decoder.getDecoderByType(
+                    TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                    TSDataType.INT64);
+            Decoder valueDecoder =
+                Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+            int dataSize = header.getDataSize();
+            pageIndex = 0;
+            if (header.getDataType() == TSDataType.VECTOR) {
+              timeBatch.clear();
+            }
+            boolean addSchema = false;
+            List<TsPrimitiveType> valueList = new ArrayList<>();
+            while (dataSize > 0) {
+              valueDecoder.reset();
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+              ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+              if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+                  == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+                readAndSendTimePage(
+                    addSchema,
+                    schemaForAlignedSeries,
+                    header,
+                    pageHeader,
+                    pageData,
+                    defaultTimeDecoder,
+                    timeBatch,
+                    pageIndex,
+                    timeForAlignedSeries);
+                addSchema = true;
+                isAlignedChunk = true;
+              } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+                  == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+                readAndSendValuePage(
+                    addSchema,
+                    schemaForAlignedSeries,
+                    header,
+                    pageHeader,
+                    pageData,
+                    valueDecoder,
+                    timeBatch,
+                    pageIndex,
+                    valueList);
+              } else { // NonAligned Chunk
+                readAndSendSingleSeriesPage(
+                    currentDevice, header, pageData, valueDecoder, defaultTimeDecoder, session);
+              }
+              pageIndex++;
+              dataSize -= pageHeader.getSerializedPageSize();
+            }
+            if (isAlignedChunk) {
+              valueForAlignedSeries.add(valueList);
+            }
             break;
           case MetaMarker.CHUNK_GROUP_HEADER:
+            // get the next chunk group
+            if (isAlignedChunk) {
+              Tablet tablet = new Tablet(currentDevice, schemaForAlignedSeries, MAX_TABLET_LENGTH);
+              for (int i = 0; i < timeForAlignedSeries.size(); ++i) {
+                tablet.timestamps[tablet.rowSize] = timeForAlignedSeries.get(i);
+                TsPrimitiveType[] values = new TsPrimitiveType[valueForAlignedSeries.size()];
+                for (int j = 0; j < valueForAlignedSeries.size(); ++j) {
+                  if (valueForAlignedSeries.get(j).size() < i) {
+                    values[j] = valueForAlignedSeries.get(j).get(i);
+                  } else {
+                    values[j] = null;
+                  }
+                }
+                tablet.values[tablet.rowSize++] = values;
+                if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+                  session.insertAlignedTablet(tablet);
+                  tablet.reset();
+                }
+              }
+              if (tablet.rowSize >= 0) {
+                session.insertAlignedTablet(tablet);
+                tablet.reset();
+              }
+            }
+            isAlignedChunk = false;
             ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
-            curDevice = chunkGroupHeader.getDeviceID();
+            currentDevice = chunkGroupHeader.getDeviceID();
             break;
           case MetaMarker.OPERATION_INDEX_RANGE:
             reader.readPlanIndex();
             break;
           default:
-            MetaMarker.handleUnexpectedMarker(marker);
+            System.out.printf(
+                "Cannot handle marker %d in position %d, stop reading %s%n",
+                marker, reader.position(), filename);
         }
       }
     }
+
+    writeModification(filename, session);
   }
 
-  private static void modifyChunkMetadata(
-      boolean isAligned,
-      Path path,
-      List<IChunkMetadata> chunkMetadataList,
-      List<Modification> modifications)
-      throws IllegalPathException {
-    if (modifications == null || modifications.isEmpty()) {
-      return;
+  private static void readAndSendValuePage(
+      boolean addSchema,
+      List<MeasurementSchema> schemaForAlignedSeries,
+      ChunkHeader header,
+      PageHeader pageHeader,
+      ByteBuffer pageData,
+      Decoder valueDecoder,
+      List<long[]> timeBatch,
+      int pageIndex,
+      List<TsPrimitiveType> valueList) {
+    if (!addSchema) {
+      schemaForAlignedSeries.add(
+          new MeasurementSchema(
+              header.getMeasurementID(),
+              header.getDataType(),
+              header.getEncodingType(),
+              header.getCompressionType()));
+      addSchema = true;
     }
-    List<Modification> measurementModifications = new ArrayList<>();
-    Iterator<Modification> modsIterator = modifications.listIterator();
-    Deletion currentDeletion;
-    while (modsIterator.hasNext()) {
-      currentDeletion = (Deletion) modsIterator.next();
-      // if deletion path match the chunkPath, then add the deletion to the list
-      if (currentDeletion.getPath().matchFullPath(new PartialPath(path.getFullPath()))) {
-        measurementModifications.add(currentDeletion);
+    ValuePageReader valuePageReader =
+        new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+    TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+    valueList.addAll(Arrays.asList(valueBatch));
+  }
+
+  private static void readAndSendTimePage(
+      boolean addSchema,
+      List<MeasurementSchema> schemaForAlignedSeries,
+      ChunkHeader header,
+      PageHeader pageHeader,
+      ByteBuffer pageData,
+      Decoder defaultTimeDecoder,
+      List<long[]> timeBatch,
+      int pageIndex,
+      List<Long> timeForAlignedSeries)
+      throws IOException {
+    if (!addSchema) {
+      schemaForAlignedSeries.add(
+          new MeasurementSchema(
+              header.getMeasurementID(),
+              header.getDataType(),
+              header.getEncodingType(),
+              header.getCompressionType()));
+    }
+    TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+    timeBatch.add(timePageReader.getNextTimeBatch());
+    for (int i = 0; i < timeBatch.get(pageIndex).length; i++) {
+      timeForAlignedSeries.add(timeBatch.get(pageIndex)[i]);
+    }
+  }
+
+  private static void readAndSendSingleSeriesPage(
+      String currentDevice,
+      ChunkHeader header,
+      ByteBuffer pageData,
+      Decoder valueDecoder,
+      Decoder defaultTimeDecoder,
+      Session session)
+      throws IOException, IoTDBConnectionException, StatementExecutionException {
+    Tablet tablet =
+        new Tablet(
+            currentDevice,
+            Collections.singletonList(
+                new MeasurementSchema(
+                    header.getMeasurementID(),
+                    header.getDataType(),
+                    header.getEncodingType(),
+                    header.getCompressionType())),
+            MAX_TABLET_LENGTH);
+    PageReader pageReader =
+        new PageReader(pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+    BatchData batchData = pageReader.getAllSatisfiedPageData();
+    while (batchData.hasCurrent()) {
+      tablet.timestamps[tablet.rowSize] = batchData.currentTime();
+      tablet.values[tablet.rowSize++] = batchData.currentValue();
+      if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+        session.insertTablet(tablet);
+        tablet.reset();
       }
+      batchData.next();
     }
-    if (!isAligned) {
-      QueryUtils.modifyChunkMetaData(chunkMetadataList, measurementModifications);
-    } else {
-      List<AlignedChunkMetadata> alignedChunkMetadataList = new ArrayList<>();
-      for (IChunkMetadata chunkMetadata : chunkMetadataList) {
-        alignedChunkMetadataList.add((AlignedChunkMetadata) chunkMetadata);
+    if (tablet.rowSize > 0) {
+      session.insertTablet(tablet);
+      tablet.reset();
+    }
+  }
+
+  private static void writeModification(String filename, Session session)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<Modification> modifications = null;
+    if (FSFactoryProducer.getFSFactory()
+        .getFile(filename + ModificationFile.FILE_SUFFIX)
+        .exists()) {
+      modifications =
+          (List<Modification>)
+              new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
+      for (Modification modification : modifications) {
+        session.executeNonQueryStatement(
+            String.format(
+                "delete from %s.%s where time >= %d and time <= %d",
+                modification.getDevice(),
+                modification.getMeasurement(),
+                ((Deletion) modification).getStartTime(),
+                ((Deletion) modification).getEndTime()));
       }
-      // AlignedChunk only contains one valueChunkMetadata which is measurement with this path
-      QueryUtils.modifyAlignedChunkMetaData(
-          alignedChunkMetadataList, Collections.singletonList(measurementModifications));
     }
   }