You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/03 03:34:06 UTC

[iotdb] 01/01: Optimize the Upgrade Tool rewrite logic

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch upgradeMem
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c5b2ab090c76e588d3f945256357c50735e82543
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Aug 3 11:29:56 2021 +0800

    Optimize the Upgrade Tool rewrite logic
---
 .../apache/iotdb/db/tools/TsFileRewriteTool.java   | 143 +++++++++++----------
 .../db/tools/upgrade/TsFileOnlineUpgradeTool.java  |  32 ++---
 2 files changed, 89 insertions(+), 86 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index e1e4bc7..54e1560 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -1,20 +1,16 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  */
 package org.apache.iotdb.db.tools;
 
@@ -158,11 +154,9 @@ public class TsFileRewriteTool implements AutoCloseable {
     int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
     reader.position(headerLength);
     // start to scan chunks and chunkGroups
-    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
-    List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
-    List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
     byte marker;
-    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+
+    Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
     String lastChunkGroupDeviceId = null;
     try {
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
@@ -176,7 +170,6 @@ public class TsFileRewriteTool implements AutoCloseable {
                     header.getDataType(),
                     header.getEncodingType(),
                     header.getCompressionType());
-            measurementSchemaList.add(measurementSchema);
             TSDataType dataType = header.getDataType();
             TSEncoding encoding = header.getEncodingType();
             List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -194,24 +187,19 @@ public class TsFileRewriteTool implements AutoCloseable {
               dataInChunk.add(pageData);
               dataSize -= pageHeader.getSerializedPageSize();
             }
-            pageHeadersInChunkGroup.add(pageHeadersInChunk);
-            pageDataInChunkGroup.add(dataInChunk);
-            needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
+            reEncodeChunk(
+                measurementSchema,
+                pageHeadersInChunk,
+                dataInChunk,
+                needToDecodeInfo,
+                chunkWritersInChunkGroup);
             break;
           case MetaMarker.CHUNK_GROUP_HEADER:
             ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
             String deviceId = chunkGroupHeader.getDeviceID();
-            if (lastChunkGroupDeviceId != null && !measurementSchemaList.isEmpty()) {
-              rewrite(
-                  lastChunkGroupDeviceId,
-                  measurementSchemaList,
-                  pageHeadersInChunkGroup,
-                  pageDataInChunkGroup,
-                  needToDecodeInfoInChunkGroup);
-              pageHeadersInChunkGroup.clear();
-              pageDataInChunkGroup.clear();
-              measurementSchemaList.clear();
-              needToDecodeInfoInChunkGroup.clear();
+            if (lastChunkGroupDeviceId != null && !chunkWritersInChunkGroup.isEmpty()) {
+              reWriteChunkGroupToFile(lastChunkGroupDeviceId, chunkWritersInChunkGroup);
+              chunkWritersInChunkGroup.clear();
             }
             lastChunkGroupDeviceId = deviceId;
             break;
@@ -239,17 +227,9 @@ public class TsFileRewriteTool implements AutoCloseable {
         }
       }
 
-      if (!measurementSchemaList.isEmpty()) {
-        rewrite(
-            lastChunkGroupDeviceId,
-            measurementSchemaList,
-            pageHeadersInChunkGroup,
-            pageDataInChunkGroup,
-            needToDecodeInfoInChunkGroup);
-        pageHeadersInChunkGroup.clear();
-        pageDataInChunkGroup.clear();
-        measurementSchemaList.clear();
-        needToDecodeInfoInChunkGroup.clear();
+      if (!chunkWritersInChunkGroup.isEmpty()) {
+        reWriteChunkGroupToFile(lastChunkGroupDeviceId, chunkWritersInChunkGroup);
+        chunkWritersInChunkGroup.clear();
       }
       // close upgraded tsFiles and generate resources for them
       for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
@@ -305,30 +285,61 @@ public class TsFileRewriteTool implements AutoCloseable {
    * this case, we have to decode the data to points, and then rewrite the data points to different
    * chunkWriters, finally write chunks to their own upgraded TsFiles.
    */
-  protected void rewrite(
-      String deviceId,
-      List<MeasurementSchema> schemas,
-      List<List<PageHeader>> pageHeadersInChunkGroup,
-      List<List<ByteBuffer>> dataInChunkGroup,
-      List<List<Boolean>> needToDecodeInfoInChunkGroup)
+  //  protected void rewrite(String deviceId, List<MeasurementSchema> schemas,
+  //      List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup,
+  //      List<List<Boolean>> needToDecodeInfoInChunkGroup) throws IOException, PageException {
+  //    Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new
+  // HashMap<>();
+  //    for (int i = 0; i < schemas.size(); i++) {
+  //      MeasurementSchema schema = schemas.get(i);
+  //      List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
+  //      List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
+  //      List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
+  //      valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+  //      for (int j = 0; j < pageDataInChunk.size(); j++) {
+  //        if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
+  //          decodeAndWritePage(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
+  //        } else {
+  //          writePage(schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j),
+  //              chunkWritersInChunkGroup);
+  //        }
+  //      }
+  //    }
+  //
+  //    for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup
+  //        .entrySet()) {
+  //      long partitionId = entry.getKey();
+  //      TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
+  //      tsFileIOWriter.startChunkGroup(deviceId);
+  //      // write chunks to their own upgraded tsFiles
+  //      for (IChunkWriter chunkWriter : entry.getValue().values()) {
+  //        chunkWriter.writeToFileWriter(tsFileIOWriter);
+  //      }
+  //      tsFileIOWriter.endChunkGroup();
+  //    }
+  //  }
+
+  protected void reEncodeChunk(
+      MeasurementSchema schema,
+      List<PageHeader> pageHeadersInChunk,
+      List<ByteBuffer> pageDataInChunk,
+      List<Boolean> needToDecodeInfoInChunk,
+      Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
       throws IOException, PageException {
-    Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
-    for (int i = 0; i < schemas.size(); i++) {
-      MeasurementSchema schema = schemas.get(i);
-      List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
-      List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
-      List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
-      valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
-      for (int j = 0; j < pageDataInChunk.size(); j++) {
-        if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
-          decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
-        } else {
-          writePageInToFile(
-              schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
-        }
+    valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+    for (int i = 0; i < pageDataInChunk.size(); i++) {
+      if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) {
+        decodeAndWritePage(schema, pageDataInChunk.get(i), chunkWritersInChunkGroup);
+      } else {
+        writePage(
+            schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), chunkWritersInChunkGroup);
       }
     }
+  }
 
+  protected void reWriteChunkGroupToFile(
+      String deviceId, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+      throws IOException {
     for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry :
         chunkWritersInChunkGroup.entrySet()) {
       long partitionId = entry.getKey();
@@ -380,7 +391,7 @@ public class TsFileRewriteTool implements AutoCloseable {
         });
   }
 
-  protected void writePageInToFile(
+  protected void writePage(
       MeasurementSchema schema,
       PageHeader pageHeader,
       ByteBuffer pageData,
@@ -396,7 +407,7 @@ public class TsFileRewriteTool implements AutoCloseable {
     chunkWritersInChunkGroup.put(partitionId, chunkWriters);
   }
 
-  protected void decodeAndWritePageInToFiles(
+  protected void decodeAndWritePage(
       MeasurementSchema schema,
       ByteBuffer pageData,
       Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 90646b5..1c48488 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -96,11 +97,8 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
         TSFileConfig.MAGIC_STRING.getBytes().length
             + TSFileConfig.VERSION_NUMBER_V2.getBytes().length;
     reader.position(headerLength);
-    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
-    List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
-    List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
     byte marker;
-    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+    Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
     try {
       while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
@@ -112,7 +110,6 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
                     header.getDataType(),
                     header.getEncodingType(),
                     header.getCompressionType());
-            measurementSchemaList.add(measurementSchema);
             TSDataType dataType = header.getDataType();
             TSEncoding encoding = header.getEncodingType();
             List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -141,25 +138,20 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
                       // page data bytes
                       + pageHeader.getCompressedSize());
             }
-            pageHeadersInChunkGroup.add(pageHeadersInChunk);
-            pageDataInChunkGroup.add(dataInChunk);
-            needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
+            reEncodeChunk(
+                measurementSchema,
+                pageHeadersInChunk,
+                dataInChunk,
+                needToDecodeInfo,
+                chunkWritersInChunkGroup);
             break;
           case MetaMarker.CHUNK_GROUP_HEADER:
             // this is the footer of a ChunkGroup in TsFileV2.
             ChunkGroupHeader chunkGroupFooter =
                 ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
             String deviceID = chunkGroupFooter.getDeviceID();
-            rewrite(
-                deviceID,
-                measurementSchemaList,
-                pageHeadersInChunkGroup,
-                pageDataInChunkGroup,
-                needToDecodeInfoInChunkGroup);
-            pageHeadersInChunkGroup.clear();
-            pageDataInChunkGroup.clear();
-            measurementSchemaList.clear();
-            needToDecodeInfoInChunkGroup.clear();
+            reWriteChunkGroupToFile(deviceID, chunkWritersInChunkGroup);
+            chunkWritersInChunkGroup.clear();
             break;
           case MetaMarker.VERSION:
             long version = ((TsFileSequenceReaderForV2) reader).readVersion();
@@ -216,7 +208,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
           currentMod = null;
         }
       }
-    } catch (IOException e2) {
+    } catch (Exception e2) {
       throw new IOException(
           "TsFile upgrade process cannot proceed at position "
               + reader.position()
@@ -252,7 +244,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
   }
 
   @Override
-  protected void decodeAndWritePageInToFiles(
+  protected void decodeAndWritePage(
       MeasurementSchema schema,
       ByteBuffer pageData,
       Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)