You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/10 00:29:45 UTC

[iotdb] branch master updated: [IOTDB-1546] Optimize the Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost (#3701)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4884dc8  [IOTDB-1546] Optimize the Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost (#3701)
4884dc8 is described below

commit 4884dc87baaf3ca7126dd297defece7da8a6ce97
Author: Haonan <hh...@outlook.com>
AuthorDate: Tue Aug 10 08:29:18 2021 +0800

    [IOTDB-1546] Optimize the Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost (#3701)
---
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   | 148 ++++++++------------
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  | 151 ++++++++++++---------
 2 files changed, 150 insertions(+), 149 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index c52457b..60c42ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.tools;
 
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -44,7 +45,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -159,15 +159,19 @@ public class TsFileRewriteTool implements AutoCloseable {
     int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
     reader.position(headerLength);
     // start to scan chunks and chunkGroups
-    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
-    List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
-    List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
     byte marker;
-    List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
-    String lastChunkGroupDeviceId = null;
+
+    String deviceId = null;
+    boolean firstChunkInChunkGroup = true;
     try {
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
+          case MetaMarker.CHUNK_GROUP_HEADER:
+            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+            deviceId = chunkGroupHeader.getDeviceID();
+            firstChunkInChunkGroup = true;
+            endChunkGroup();
+            break;
           case MetaMarker.CHUNK_HEADER:
           case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
             ChunkHeader header = reader.readChunkHeader(marker);
@@ -177,7 +181,6 @@ public class TsFileRewriteTool implements AutoCloseable {
                     header.getDataType(),
                     header.getEncodingType(),
                     header.getCompressionType());
-            measurementSchemaList.add(measurementSchema);
             TSDataType dataType = header.getDataType();
             TSEncoding encoding = header.getEncodingType();
             List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -195,26 +198,14 @@ public class TsFileRewriteTool implements AutoCloseable {
               dataInChunk.add(pageData);
               dataSize -= pageHeader.getSerializedPageSize();
             }
-            pageHeadersInChunkGroup.add(pageHeadersInChunk);
-            pageDataInChunkGroup.add(dataInChunk);
-            needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
-            break;
-          case MetaMarker.CHUNK_GROUP_HEADER:
-            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
-            String deviceId = chunkGroupHeader.getDeviceID();
-            if (lastChunkGroupDeviceId != null && !measurementSchemaList.isEmpty()) {
-              rewrite(
-                  lastChunkGroupDeviceId,
-                  measurementSchemaList,
-                  pageHeadersInChunkGroup,
-                  pageDataInChunkGroup,
-                  needToDecodeInfoInChunkGroup);
-              pageHeadersInChunkGroup.clear();
-              pageDataInChunkGroup.clear();
-              measurementSchemaList.clear();
-              needToDecodeInfoInChunkGroup.clear();
-            }
-            lastChunkGroupDeviceId = deviceId;
+            reWriteChunk(
+                deviceId,
+                firstChunkInChunkGroup,
+                measurementSchema,
+                pageHeadersInChunk,
+                dataInChunk,
+                needToDecodeInfo);
+            firstChunkInChunkGroup = false;
             break;
           case MetaMarker.OPERATION_INDEX_RANGE:
             reader.readPlanIndex();
@@ -239,19 +230,7 @@ public class TsFileRewriteTool implements AutoCloseable {
             MetaMarker.handleUnexpectedMarker(marker);
         }
       }
-
-      if (!measurementSchemaList.isEmpty()) {
-        rewrite(
-            lastChunkGroupDeviceId,
-            measurementSchemaList,
-            pageHeadersInChunkGroup,
-            pageDataInChunkGroup,
-            needToDecodeInfoInChunkGroup);
-        pageHeadersInChunkGroup.clear();
-        pageDataInChunkGroup.clear();
-        measurementSchemaList.clear();
-        needToDecodeInfoInChunkGroup.clear();
-      }
+      endChunkGroup();
       // close upgraded tsFiles and generate resources for them
       for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
         rewrittenResources.add(endFileAndGenerateResource(tsFileIOWriter));
@@ -302,44 +281,43 @@ public class TsFileRewriteTool implements AutoCloseable {
   }
 
   /**
-   * This method is for rewriting the ChunkGroup which data is in the different time partitions. In
-   * this case, we have to decode the data to points, and then rewrite the data points to different
+   * This method is for rewriting the Chunk which data is in the different time partitions. In this
+   * case, we have to decode the data to points, and then rewrite the data points to different
    * chunkWriters, finally write chunks to their own upgraded TsFiles.
    */
-  protected void rewrite(
+  protected void reWriteChunk(
       String deviceId,
-      List<IMeasurementSchema> schemas,
-      List<List<PageHeader>> pageHeadersInChunkGroup,
-      List<List<ByteBuffer>> dataInChunkGroup,
-      List<List<Boolean>> needToDecodeInfoInChunkGroup)
+      boolean firstChunkInChunkGroup,
+      MeasurementSchema schema,
+      List<PageHeader> pageHeadersInChunk,
+      List<ByteBuffer> pageDataInChunk,
+      List<Boolean> needToDecodeInfoInChunk)
       throws IOException, PageException {
-    Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
-    for (int i = 0; i < schemas.size(); i++) {
-      IMeasurementSchema schema = schemas.get(i);
-      List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
-      List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
-      List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
-      valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
-      for (int j = 0; j < pageDataInChunk.size(); j++) {
-        if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
-          decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
-        } else {
-          writePageInToFile(
-              schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
-        }
+    valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+    Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>();
+    for (int i = 0; i < pageDataInChunk.size(); i++) {
+      if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) {
+        decodeAndWritePage(schema, pageDataInChunk.get(i), partitionChunkWriterMap);
+      } else {
+        writePage(
+            schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap);
       }
     }
-
-    for (Entry<Long, Map<IMeasurementSchema, ChunkWriterImpl>> entry :
-        chunkWritersInChunkGroup.entrySet()) {
+    for (Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) {
       long partitionId = entry.getKey();
       TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
-      tsFileIOWriter.startChunkGroup(deviceId);
-      // write chunks to their own upgraded tsFiles
-      for (IChunkWriter chunkWriter : entry.getValue().values()) {
-        chunkWriter.writeToFileWriter(tsFileIOWriter);
+      if (firstChunkInChunkGroup) {
+        tsFileIOWriter.startChunkGroup(deviceId);
       }
-      tsFileIOWriter.endChunkGroup();
+      // write chunks to their own upgraded tsFiles
+      IChunkWriter chunkWriter = entry.getValue();
+      chunkWriter.writeToFileWriter(tsFileIOWriter);
+    }
+  }
+
+  protected void endChunkGroup() throws IOException {
+    for (TsFileIOWriter tsFileIoWriter : partitionWriterMap.values()) {
+      tsFileIoWriter.endChunkGroup();
     }
   }
 
@@ -381,46 +359,42 @@ public class TsFileRewriteTool implements AutoCloseable {
         });
   }
 
-  protected void writePageInToFile(
-      IMeasurementSchema schema,
+  protected void writePage(
+      MeasurementSchema schema,
       PageHeader pageHeader,
       ByteBuffer pageData,
-      Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
       throws PageException {
     long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
     getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
-    Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
-        chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
-    ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+    ChunkWriterImpl chunkWriter =
+        partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));
     chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
-    chunkWriters.put(schema, chunkWriter);
-    chunkWritersInChunkGroup.put(partitionId, chunkWriters);
   }
 
-  protected void decodeAndWritePageInToFiles(
-      IMeasurementSchema schema,
+  protected void decodeAndWritePage(
+      MeasurementSchema schema,
       ByteBuffer pageData,
-      Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
       throws IOException {
     valueDecoder.reset();
     PageReader pageReader =
         new PageReader(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null);
     BatchData batchData = pageReader.getAllSatisfiedPageData();
-    rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup);
+    rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
   }
 
   protected void rewritePageIntoFiles(
       BatchData batchData,
-      IMeasurementSchema schema,
-      Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) {
+      MeasurementSchema schema,
+      Map<Long, ChunkWriterImpl> partitionChunkWriterMap) {
     while (batchData.hasCurrent()) {
       long time = batchData.currentTime();
       Object value = batchData.currentValue();
       long partitionId = StorageEngine.getTimePartition(time);
 
-      Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
-          chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
-      ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+      ChunkWriterImpl chunkWriter =
+          partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));
       getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
       switch (schema.getType()) {
         case INT32:
@@ -446,8 +420,6 @@ public class TsFileRewriteTool implements AutoCloseable {
               String.format("Data type %s is not supported.", schema.getType()));
       }
       batchData.next();
-      chunkWriters.put(schema, chunkWriter);
-      chunkWritersInChunkGroup.put(partitionId, chunkWriters);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 8b04bba..0198844 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
 import org.apache.iotdb.tsfile.v2.read.reader.page.PageReaderV2;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -93,70 +92,99 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
         TSFileConfig.MAGIC_STRING.getBytes().length
             + TSFileConfig.VERSION_NUMBER_V2.getBytes().length;
     reader.position(headerLength);
-    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
-    List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
-    List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
     byte marker;
-    List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+    long firstChunkPositionInChunkGroup = headerLength;
+    boolean firstChunkInChunkGroup = true;
+    String deviceId = null;
+    boolean skipReadingChunk = true;
     try {
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
           case MetaMarker.CHUNK_HEADER:
-            ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader();
-            IMeasurementSchema measurementSchema =
-                new MeasurementSchema(
-                    header.getMeasurementID(),
-                    header.getDataType(),
-                    header.getEncodingType(),
-                    header.getCompressionType());
-            measurementSchemaList.add(measurementSchema);
-            TSDataType dataType = header.getDataType();
-            TSEncoding encoding = header.getEncodingType();
-            List<PageHeader> pageHeadersInChunk = new ArrayList<>();
-            List<ByteBuffer> dataInChunk = new ArrayList<>();
-            List<Boolean> needToDecodeInfo = new ArrayList<>();
-            int dataSize = header.getDataSize();
-            while (dataSize > 0) {
-              // a new Page
-              PageHeader pageHeader = ((TsFileSequenceReaderForV2) reader).readPageHeader(dataType);
-              boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader);
-              needToDecodeInfo.add(needToDecode);
-              ByteBuffer pageData =
-                  !needToDecode
-                      ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader)
-                      : reader.readPage(pageHeader, header.getCompressionType());
-              pageHeadersInChunk.add(pageHeader);
-              dataInChunk.add(pageData);
-              dataSize -=
-                  (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize
-                      // count, startTime, endTime bytes size in old statistics
-                      + 24
-                      // statistics bytes size
-                      // new boolean StatsSize is 8 bytes larger than old one
-                      + (pageHeader.getStatistics().getStatsSize()
-                          - (dataType == TSDataType.BOOLEAN ? 8 : 0))
-                      // page data bytes
-                      + pageHeader.getCompressedSize());
+            if (skipReadingChunk || deviceId == null) {
+              ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader();
+              int dataSize = header.getDataSize();
+              while (dataSize > 0) {
+                // a new Page
+                PageHeader pageHeader =
+                    ((TsFileSequenceReaderForV2) reader).readPageHeader(header.getDataType());
+                ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader);
+                dataSize -=
+                    (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize
+                        // count, startTime, endTime bytes size in old statistics
+                        + 24
+                        // statistics bytes size
+                        // new boolean StatsSize is 8 bytes larger than old one
+                        + (pageHeader.getStatistics().getStatsSize()
+                            - (header.getDataType() == TSDataType.BOOLEAN ? 8 : 0))
+                        // page data bytes
+                        + pageHeader.getCompressedSize());
+              }
+            } else {
+              ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader();
+              MeasurementSchema measurementSchema =
+                  new MeasurementSchema(
+                      header.getMeasurementID(),
+                      header.getDataType(),
+                      header.getEncodingType(),
+                      header.getCompressionType());
+              TSDataType dataType = header.getDataType();
+              TSEncoding encoding = header.getEncodingType();
+              List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+              List<ByteBuffer> dataInChunk = new ArrayList<>();
+              List<Boolean> needToDecodeInfo = new ArrayList<>();
+              int dataSize = header.getDataSize();
+              while (dataSize > 0) {
+                // a new Page
+                PageHeader pageHeader =
+                    ((TsFileSequenceReaderForV2) reader).readPageHeader(dataType);
+                boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader);
+                needToDecodeInfo.add(needToDecode);
+                ByteBuffer pageData =
+                    !needToDecode
+                        ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader)
+                        : reader.readPage(pageHeader, header.getCompressionType());
+                pageHeadersInChunk.add(pageHeader);
+                dataInChunk.add(pageData);
+                dataSize -=
+                    (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize
+                        // count, startTime, endTime bytes size in old statistics
+                        + 24
+                        // statistics bytes size
+                        // new boolean StatsSize is 8 bytes larger than old one
+                        + (pageHeader.getStatistics().getStatsSize()
+                            - (dataType == TSDataType.BOOLEAN ? 8 : 0))
+                        // page data bytes
+                        + pageHeader.getCompressedSize());
+              }
+              reWriteChunk(
+                  deviceId,
+                  firstChunkInChunkGroup,
+                  measurementSchema,
+                  pageHeadersInChunk,
+                  dataInChunk,
+                  needToDecodeInfo);
+              if (firstChunkInChunkGroup) {
+                firstChunkInChunkGroup = false;
+              }
             }
-            pageHeadersInChunkGroup.add(pageHeadersInChunk);
-            pageDataInChunkGroup.add(dataInChunk);
-            needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
             break;
           case MetaMarker.CHUNK_GROUP_HEADER:
             // this is the footer of a ChunkGroup in TsFileV2.
-            ChunkGroupHeader chunkGroupFooter =
-                ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
-            String deviceID = chunkGroupFooter.getDeviceID();
-            rewrite(
-                deviceID,
-                measurementSchemaList,
-                pageHeadersInChunkGroup,
-                pageDataInChunkGroup,
-                needToDecodeInfoInChunkGroup);
-            pageHeadersInChunkGroup.clear();
-            pageDataInChunkGroup.clear();
-            measurementSchemaList.clear();
-            needToDecodeInfoInChunkGroup.clear();
+            if (skipReadingChunk) {
+              skipReadingChunk = false;
+              ChunkGroupHeader chunkGroupFooter =
+                  ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
+              deviceId = chunkGroupFooter.getDeviceID();
+              reader.position(firstChunkPositionInChunkGroup);
+            } else {
+              endChunkGroup();
+              skipReadingChunk = true;
+              ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
+              deviceId = null;
+              firstChunkPositionInChunkGroup = reader.position();
+              firstChunkInChunkGroup = true;
+            }
             break;
           case MetaMarker.VERSION:
             long version = ((TsFileSequenceReaderForV2) reader).readVersion();
@@ -184,6 +212,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
             for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
               tsFileIOWriter.writePlanIndices();
             }
+            firstChunkPositionInChunkGroup = reader.position();
             break;
           default:
             // the disk file is corrupted, using this file may be dangerous
@@ -213,7 +242,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
           currentMod = null;
         }
       }
-    } catch (IOException e2) {
+    } catch (Exception e2) {
       throw new IOException(
           "TsFile upgrade process cannot proceed at position "
               + reader.position()
@@ -249,16 +278,16 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
   }
 
   @Override
-  protected void decodeAndWritePageInToFiles(
-      IMeasurementSchema schema,
+  protected void decodeAndWritePage(
+      MeasurementSchema schema,
       ByteBuffer pageData,
-      Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
       throws IOException {
     valueDecoder.reset();
     PageReaderV2 pageReader =
         new PageReaderV2(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null);
     BatchData batchData = pageReader.getAllSatisfiedPageData();
-    rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup);
+    rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
   }
 
   /** check if the file to be upgraded has correct magic strings and version number */