You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/05/10 08:49:35 UTC

[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #983: Online Upgrade Tool

qiaojialin commented on a change in pull request #983:
URL: https://github.com/apache/incubator-iotdb/pull/983#discussion_r422580421



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
##########
@@ -54,6 +55,15 @@ public SimpleFileVersionController(String directoryPath, long timePartitionId)
     restore();
   }
 
+  /**
+   * only used for upgrading
+   */
+  public SimpleFileVersionController(String directoryPath) throws IOException {
+    this.directoryPath = directoryPath;

Review comment:
       it's better to store this file in directoryPath / upgrade folder, be consistent with other partitions

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MLogWriter.java
##########
@@ -124,7 +124,8 @@ public void changeOffset(String path, long offset) throws IOException {
   public static void upgradeMLog(String schemaDir, String logFileName) throws IOException {
     File logFile = FSFactoryProducer.getFSFactory()
         .getFile(schemaDir + File.separator + logFileName);
-    File tmpLogFile = new File(logFile.getAbsolutePath() + ".tmp");
+    File tmpLogFile = FSFactoryProducer.getFSFactory()

Review comment:
       FSFactoryProducer is not designed for system file. We need to construct a SystemFileProducer

##########
File path: server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
##########
@@ -63,6 +63,10 @@ public DiskChunkMetadataLoader(TsFileResource resource, Path seriesPath, QueryCo
             || chunkMetaData.getStartTime() > chunkMetaData.getEndTime());
     return chunkMetadataList;
   }
+  
+  public void setDiskChunkLoader(List<ChunkMetadata> chunkMetadataList) throws IOException {

Review comment:
       add javadoc: for upgrade? in which case this will be called

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
##########
@@ -99,48 +101,103 @@ private void checkFile(String filepath) {
           properties.setProperty("iotdb_version", iotdbVersion);
           properties.store(outputStream, "System properties:");
         }
+        checkProperties();

Review comment:
       this check is not needed

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/UpgradeTool.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+public class UpgradeTool {

Review comment:
       How about combining this class with TsFileOnlineUpgradeTool 

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1313,12 +1450,12 @@ private void closeUnsealedTsFileProcessorCallBack(
    */
   public int countUpgradeFiles() {
     int cntUpgradeFileNum = 0;
-    for (TsFileResource seqTsFileResource : sequenceFileTreeSet) {
+    for (TsFileResource seqTsFileResource : upgradeSeqFileList) {
       if (UpgradeUtils.isNeedUpgrade(seqTsFileResource)) {
         cntUpgradeFileNum += 1;

Review comment:
       why not just use the upgradeSeqFileList.size()?

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1313,12 +1450,12 @@ private void closeUnsealedTsFileProcessorCallBack(
    */
   public int countUpgradeFiles() {
     int cntUpgradeFileNum = 0;
-    for (TsFileResource seqTsFileResource : sequenceFileTreeSet) {
+    for (TsFileResource seqTsFileResource : upgradeSeqFileList) {
       if (UpgradeUtils.isNeedUpgrade(seqTsFileResource)) {
         cntUpgradeFileNum += 1;
       }
     }
-    for (TsFileResource unseqTsFileResource : unSequenceFileList) {
+    for (TsFileResource unseqTsFileResource : upgradeUnseqFileList) {
       if (UpgradeUtils.isNeedUpgrade(unseqTsFileResource)) {
         cntUpgradeFileNum += 1;

Review comment:
       the same with above

##########
File path: server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
##########
@@ -76,9 +76,13 @@ public static boolean isNeedUpgrade(TsFileResource tsFileResource) {
     return false;
   }

Review comment:
       I can not come up with why we need this method. In merge process, the candidate files should all come from the new files.

##########
File path: server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
##########
@@ -76,9 +76,13 @@ public static boolean isNeedUpgrade(TsFileResource tsFileResource) {
     return false;
   }
 
-  public static String getUpgradeFileName(File upgradeResource) {
-    return upgradeResource.getParentFile().getParent() + File.separator + TMP_STRING
-        + File.separator + UPGRADE_FILE_PREFIX + upgradeResource.getName();
+  public static String getOneUpgradedFileName(TsFileResource upgradeResource)
+      throws IOException {
+    upgradeResource.deserialize();
+    long firstPartitionId = upgradeResource.getTimePartition();

Review comment:
       I thought upgradedResource does not have a partitionId. Is the upgradeResource a upgraded or to be upgraded?

##########
File path: server/src/main/java/org/apache/iotdb/db/utils/UpgradeUtils.java
##########
@@ -76,9 +76,13 @@ public static boolean isNeedUpgrade(TsFileResource tsFileResource) {
     return false;
   }
 
-  public static String getUpgradeFileName(File upgradeResource) {
-    return upgradeResource.getParentFile().getParent() + File.separator + TMP_STRING
-        + File.separator + UPGRADE_FILE_PREFIX + upgradeResource.getName();
+  public static String getOneUpgradedFileName(TsFileResource upgradeResource)

Review comment:
       add javadoc

##########
File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
##########
@@ -114,10 +118,26 @@ public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) {
     this.chunkMetadataLoader = chunkMetadataLoader;
   }
 
+
   public List<ChunkMetadata> loadChunkMetadataList() throws IOException {
+    if (chunkMetadataList != null) {
+      chunkMetadataLoader.setDiskChunkLoader(chunkMetadataList);
+      return chunkMetadataList;
+    }

Review comment:
       add javadoc   for  old tsfile

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();
+
+    // ChunkGroupOffset -> time partition, record the offsets of chunk group that data are in same partition
+    Map<Long, Long> chunkGroupTimePartitionInfo = new HashMap<>();
+
+    // scan metadata to get version Info and chunkGroupTimePartitionInfo
+    scanMetadata(oldVersionInfo, chunkGroupTimePartitionInfo);
+    
+    // start to scan chunks and chunkGroups
+    long startOffsetOfChunkGroup = 0;
+    boolean newChunkGroup = true;
+    long versionOfChunkGroup = 0;
+    boolean chunkGroupInSamePartition = false;
+    List<ChunkGroupMetadata> newMetaData = new ArrayList<>();
+    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
+    List<List<ByteBuffer>> dataInChunkGroup = new ArrayList<>();
+    byte marker;
+    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+    try {
+      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            // this is the first chunk of a new ChunkGroup.
+            if (newChunkGroup) {
+              newChunkGroup = false;
+              startOffsetOfChunkGroup = this.position() - 1;
+              versionOfChunkGroup = oldVersionInfo.get(startOffsetOfChunkGroup);
+              chunkGroupInSamePartition = chunkGroupTimePartitionInfo
+                  .containsKey(startOffsetOfChunkGroup);
+            }
+            ChunkHeader header = this.readChunkHeader();
+            MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
+                header.getDataType(),
+                header.getEncodingType(), 
+                header.getCompressionType());
+            measurementSchemaList.add(measurementSchema);
+            List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+            List<ByteBuffer> dataInChunk = new ArrayList<>();
+            for (int j = 0; j < header.getNumOfPages(); j++) {
+              PageHeader pageHeader = readPageHeader(header.getDataType());
+              ByteBuffer pageData = chunkGroupInSamePartition ? 
+                  readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+              pageHeadersInChunk.add(pageHeader);
+              dataInChunk.add(pageData);
+            }
+            pageHeadersInChunkGroup.add(pageHeadersInChunk);
+            dataInChunkGroup.add(dataInChunk);
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            // this is the footer of a ChunkGroup.
+            ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+            String deviceID = chunkGroupFooter.getDeviceID();
+            if (chunkGroupInSamePartition) {
+              quickRewrite(oldTsFile, deviceID, measurementSchemaList, pageHeadersInChunkGroup,
+                  dataInChunkGroup, versionOfChunkGroup, chunkGroupTimePartitionInfo.get(startOffsetOfChunkGroup));
+            } else {
+              rewrite(oldTsFile, deviceID, measurementSchemaList, 
+                dataInChunkGroup, versionOfChunkGroup);
+            }
+
+            pageHeadersInChunkGroup.clear();
+            dataInChunkGroup.clear();
+            measurementSchemaList.clear();
+            newChunkGroup = true;
+            break;
+
+          default:
+            // the disk file is corrupted, using this file may be dangerous
+            logger.error("Unrecognized marker detected, this file may be corrupted");
+            return false;
+        }
+      }
+      // close upgraded tsFiles and generate resources for them
+      for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
+        upgradedResources.add(endFileAndGenerateResource(tsFileIOWriter));
+      }
+      return true;
+    } catch (IOException e2) {
+      logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+          + "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
+      return false;
+    } finally {
+      if (tsFileInput != null) {
+        tsFileInput.close();
+      }
+    }
+  }
+
+  /**
+   *  Rewrite the chunk group to new TsFile.
+   *  If data of this chunk group are in different time partitions,
+   *  create multiple new TsFiles and rewrite data in each partition.
+   */
+  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 
+      List<List<ByteBuffer>> dataInChunkGroup, long versionOfChunkGroup) 
+          throws IOException {
+
+    Map<Long, Map<MeasurementSchema, IChunkWriter>> chunkWritersInChunkGroup = new HashMap<>();
+    for (int i = 0; i < schemas.size(); i++) {
+      MeasurementSchema schema = schemas.get(i);
+      Decoder defaultTimeDecoder = Decoder.getDecoderByType(
+          TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+          TSDataType.INT64);
+      Decoder valueDecoder = Decoder
+          .getDecoderByType(schema.getEncodingType(), schema.getType());
+      List<ByteBuffer> dataInChunk = dataInChunkGroup.get(i);
+      for (ByteBuffer pageData : dataInChunk) {
+        valueDecoder.reset();
+        PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder,
+            defaultTimeDecoder, null);
+        BatchData batchData = pageReader.getAllSatisfiedPageData();
+        while (batchData.hasCurrent()) {
+          long time = batchData.currentTime();
+          Object value = batchData.currentValue();
+          long partition = StorageEngine.getTimePartition(time);
+          
+          Map<MeasurementSchema, IChunkWriter> chunkWriters = chunkWritersInChunkGroup.getOrDefault(partition, new HashMap<>());
+          IChunkWriter chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+          TsFileIOWriter tsFileIOWriter = getOrDefaultTsFileIOWriter(oldTsFile, partition);
+          partitionWriterMap.put(partition, tsFileIOWriter);
+          switch (schema.getType()) {
+            case INT32:
+              chunkWriter.write(time, (int) value);
+              break;
+            case INT64:
+              chunkWriter.write(time, (long) value);
+              break;
+            case FLOAT:
+              chunkWriter.write(time, (float) value);
+              break;
+            case DOUBLE:
+              chunkWriter.write(time, (double) value);
+              break;
+            case BOOLEAN:
+              chunkWriter.write(time, (boolean) value);
+              break;
+            case TEXT:
+              chunkWriter.write(time, (Binary) value);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", schema.getType()));
+            }
+          chunkWriters.put(schema, chunkWriter);
+          chunkWritersInChunkGroup.put(partition, chunkWriters);
+          batchData.next();
+        }
+      }
+    }
+    // set version info to each upgraded tsFile 
+    for (Entry<Long, Map<MeasurementSchema, IChunkWriter>> entry : chunkWritersInChunkGroup.entrySet()) {
+      long partition = entry.getKey();
+      TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partition);
+      tsFileIOWriter.startChunkGroup(deviceId);
+      for (IChunkWriter chunkWriter : entry.getValue().values()) {
+        chunkWriter.writeToFileWriter(tsFileIOWriter);
+      }
+      tsFileIOWriter.endChunkGroup();
+      tsFileIOWriter.writeVersion(versionOfChunkGroup);
+    }
+  }
+
+  /**
+   * 
+   * @param oldTsFile
+   * @param deviceId
+   * @param schemas
+   * @param pageHeadersInChunkGroup
+   * @param dataInChunkGroup
+   * @param versionOfChunkGroup
+   * @param partition
+   * @throws IOException
+   * @throws PageException
+   */
+  private void quickRewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 
+      List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup, 
+      long versionOfChunkGroup, long partition) throws IOException, PageException {
+    TsFileIOWriter tsFileIOWriter = getOrDefaultTsFileIOWriter(oldTsFile, partition);
+    tsFileIOWriter.startChunkGroup(deviceId);
+    for (int i = 0; i < schemas.size(); i++) {
+      ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schemas.get(i));
+      List<PageHeader> pageHeaderList = pageHeadersInChunkGroup.get(i);
+      List<ByteBuffer> pageList = dataInChunkGroup.get(i);
+      for (int j = 0; j < pageHeaderList.size(); j++) {
+        chunkWriter.writePageHeaderAndDataIntoBuff(pageList.get(j), pageHeaderList.get(j));
+      }
+      chunkWriter.writeToFileWriter(tsFileIOWriter);
+    }
+    tsFileIOWriter.endChunkGroup();
+    tsFileIOWriter.writeVersion(versionOfChunkGroup);
+  }
+
+  private TsFileIOWriter getOrDefaultTsFileIOWriter(File oldTsFile, long partition) {
+    return partitionWriterMap.computeIfAbsent(partition, k -> 
+      {
+        File partitionDir = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+            + File.separator + partition);
+        if (!partitionDir.exists()) {
+          partitionDir.mkdirs();
+        }
+        File newFile = FSFactoryProducer.getFSFactory().getFile(oldTsFile.getParent()
+            + File.separator + partition + File.separator+ oldTsFile.getName());
+        try {
+          if (!newFile.createNewFile()) {
+            logger.error("The TsFile {} has been created ", newFile);
+            return null;
+          }
+          return new TsFileIOWriter(newFile);
+        } catch (IOException e) {
+          logger.error("Create new TsFile {} failed ", newFile);
+          return null;
+        }
+      }
+    );
+  }
+
+  /**
+   *  check if the file to be upgraded has correct magic strings and version number
+   *  @param oldTsFile
+   *  @throws IOException 
+   */
+  private boolean fileCheck(File oldTsFile) throws IOException {
+    long fileSize;
+    if (!oldTsFile.exists()) {
+      logger.error("the file to be updated does not exist, file path: {}", oldTsFile.getPath());
+      return false;
+    } else {
+      fileSize = oldTsFile.length();
+    }
+
+    String magic = readHeadMagic(true);
+    if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
+      logger.error("the file's MAGIC STRING is incorrect, file path: {}", oldTsFile.getPath());
+      return false;
+    }
+    
+    String versionNumber = readVersionNumber();
+    if (!versionNumber.equals(TSFileConfig.OLD_VERSION)) {
+      logger.error("the file's Version Number is incorrect, file path: {}", oldTsFile.getPath());
+      return false;
+    }
+
+    if (fileSize == TSFileConfig.MAGIC_STRING.length()) {
+      logger.error("the file only contains magic string, file path: {}", oldTsFile.getPath());
+      return false;
+    } else if (!readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+      logger.error("the file cannot upgrade, file path: {}", oldTsFile.getPath());
+      return false;
+    }
+    return true;
+  }
+
+  private void scanMetadata(Map<Long, Long> oldVersionInfo, 
+      Map<Long, Long> chunkGroupTimePartitionInfo) throws IOException {
+    OldTsFileMetadata fileMetadata = readFileMetadata();
+    List<OldTsDeviceMetadata> oldDeviceMetadataList = new ArrayList<>();
+    for (OldTsDeviceMetadataIndex index : fileMetadata.getDeviceMap().values()) {
+      OldTsDeviceMetadata oldDeviceMetadata = readTsDeviceMetaData(index);
+      oldDeviceMetadataList.add(oldDeviceMetadata);
+    }
+
+    for (OldTsDeviceMetadata oldTsDeviceMetadata : oldDeviceMetadataList) {
+      for (OldChunkGroupMetaData oldChunkGroupMetadata : oldTsDeviceMetadata
+          .getChunkGroupMetaDataList()) {
+        long version = oldChunkGroupMetadata.getVersion();
+        long offsetOfChunkGroupMetaData = oldChunkGroupMetadata.getStartOffsetOfChunkGroup();

Review comment:
       ```suggestion
           long offsetOfChunkGroup = oldChunkGroupMetadata.getStartOffsetOfChunkGroup();
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
##########
@@ -40,30 +47,69 @@ public UpgradeTask(TsFileResource upgradeResource) {
   @Override
   public void runMayThrow() {
     try {
-      upgradeResource.getWriteQueryLock().readLock().lock();
-      String tsfilePathBefore = upgradeResource.getFile().getAbsolutePath();
-      String tsfilePathAfter = UpgradeUtils.getUpgradeFileName(upgradeResource.getFile());
-
-      UpgradeLog.writeUpgradeLogFile(
-          tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.BEGIN_UPGRADE_FILE);
+      List<TsFileResource> upgradedResources = generateUpgradedFiles();
       upgradeResource.getWriteQueryLock().writeLock().lock();
+      String oldTsfilePath = upgradeResource.getFile().getAbsolutePath();
       try {
-        FSFactoryProducer.getFSFactory().getFile(tsfilePathBefore).delete();
-        FSFactoryProducer.getFSFactory()
-            .moveFile(FSFactoryProducer.getFSFactory().getFile(tsfilePathAfter),
-                FSFactoryProducer.getFSFactory().getFile(tsfilePathBefore));
+        // delete old TsFile
+        upgradeResource.remove();
+        // move upgraded TsFiles to their own partition directories
+        for (TsFileResource upgradedResource : upgradedResources) {
+          File upgradedFile = upgradedResource.getFile();
+          long partition = upgradedResource.getTimePartitionWithCheck();

Review comment:
       the upgradedResources is under control, no need to getTimepartition with check

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();

Review comment:
       this could be deduplicated, not a big problem.

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/UpgradeTool.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+
+public class UpgradeTool {
+  
+  private UpgradeTool() {
+  }
+
+  /**
+   * upgrade a single tsfile
+   *
+   * @param tsfileName old version tsFile's absolute path
+   * @param upgradedResources new version tsFiles' resources
+   * @throws WriteProcessException 
+   */
+  public static void upgradeOneTsfile(String tsFileName, List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    try (TsfileOnlineUpgradeTool updater = new TsfileOnlineUpgradeTool(tsFileName)) {
+      updater.upgradeFile(upgradedResources);

Review comment:
       I suggest not return a boolean, because we usually do not handly it. Check code to see whether you clean the crashed files when restarting, Do you handle crashed upgraded files by covering?

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
##########
@@ -40,30 +47,69 @@ public UpgradeTask(TsFileResource upgradeResource) {
   @Override
   public void runMayThrow() {
     try {
-      upgradeResource.getWriteQueryLock().readLock().lock();
-      String tsfilePathBefore = upgradeResource.getFile().getAbsolutePath();
-      String tsfilePathAfter = UpgradeUtils.getUpgradeFileName(upgradeResource.getFile());
-
-      UpgradeLog.writeUpgradeLogFile(
-          tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.BEGIN_UPGRADE_FILE);
+      List<TsFileResource> upgradedResources = generateUpgradedFiles();
       upgradeResource.getWriteQueryLock().writeLock().lock();
+      String oldTsfilePath = upgradeResource.getFile().getAbsolutePath();
       try {
-        FSFactoryProducer.getFSFactory().getFile(tsfilePathBefore).delete();
-        FSFactoryProducer.getFSFactory()
-            .moveFile(FSFactoryProducer.getFSFactory().getFile(tsfilePathAfter),
-                FSFactoryProducer.getFSFactory().getFile(tsfilePathBefore));
+        // delete old TsFile
+        upgradeResource.remove();
+        // move upgraded TsFiles to their own partition directories
+        for (TsFileResource upgradedResource : upgradedResources) {
+          File upgradedFile = upgradedResource.getFile();
+          long partition = upgradedResource.getTimePartitionWithCheck();
+          String storageGroupPath = upgradedFile.getParentFile().getParentFile().getParent();
+          File partitionDir = FSFactoryProducer.getFSFactory().getFile(storageGroupPath, partition + "");
+          if (!partitionDir.exists()) {
+            partitionDir.mkdir();
+          }
+          FSFactoryProducer.getFSFactory().moveFile(upgradedFile,
+              FSFactoryProducer.getFSFactory().getFile(partitionDir, upgradedFile.getName()));
+          upgradedResource.setFile(
+              FSFactoryProducer.getFSFactory().getFile(partitionDir, upgradedFile.getName()));
+          upgradedResource.serialize();
+          // delete tmp partition folder when it is empty
+          if (upgradedFile.getParentFile().isDirectory() 
+              && upgradedFile.getParentFile().listFiles().length == 0) {

Review comment:
       this may produce NullPointerException, better to have a check

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();
+
+    // ChunkGroupOffset -> time partition, record the offsets of chunk group that data are in same partition
+    Map<Long, Long> chunkGroupTimePartitionInfo = new HashMap<>();
+
+    // scan metadata to get version Info and chunkGroupTimePartitionInfo
+    scanMetadata(oldVersionInfo, chunkGroupTimePartitionInfo);

Review comment:
       I prefer to do this check for each page header. 

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1106,6 +1220,18 @@ public void writeUnlock() {
         closeQueryLock.readLock().unlock();
       }
     }
+    // for upgrade files and old files must be closed
+    for (TsFileResource tsFileResource : upgradeTsFileResources) {
+      if (!isTsFileResourceSatisfied(tsFileResource, deviceId, timeFilter)) {
+        continue;
+      }
+      closeQueryLock.readLock().lock();
+      try {
+        tsfileResourcesForQuery.add(tsFileResource);
+      } finally {
+        closeQueryLock.readLock().unlock();
+      }
+    }

Review comment:
       lock once is enough

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -316,40 +377,93 @@ private VersionController getVersionControllerByTimePartitionId(long timePartiti
         });
   }
 
-  private List<TsFileResource> getAllFiles(List<String> folders) {
+  private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException {
     List<File> tsFiles = new ArrayList<>();
+    List<File> upgradeFiles = new ArrayList<>();
     for (String baseDir : folders) {
       File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
       if (!fileFolder.exists()) {
         continue;
       }
 
+      // old version
+      // some TsFileResource may be being persisted when the system crashed, try recovering such
+      // resources
+      continueFailedRenames(fileFolder, TEMP_SUFFIX);
+
+      // some TsFiles were going to be replaced by the merged files when the system crashed and
+      // the process was interrupted before the merged files could be named
+      continueFailedRenames(fileFolder, MERGE_SUFFIX);
+
+      File[] oldTsfileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX);
+      File[] oldResourceFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX);
+      File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME);
+      // move the old files to upgrade folder if exists
+      if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) {
+        // create upgrade directory if not exist
+        if (upgradeFolder.mkdirs()) {
+          logger.info("Upgrade Directory {} doesn't exist, create it",
+              upgradeFolder.getPath());
+        } else if (!upgradeFolder.exists()) {
+          logger.error("Create upgrade Directory {} failed",
+              upgradeFolder.getPath());
+        }
+        // move .tsfile to upgrade folder
+        for (File file : oldTsfileArray) {
+          if (!file.renameTo(fsFactory.getFile(upgradeFolder, file.getName()))) {
+            logger.error("Failed to move {} to upgrade folder", file);
+          }
+        }
+        // move .resource to upgrade folder
+        for (File file : oldResourceFileArray) {
+          if (!file.renameTo(fsFactory.getFile(upgradeFolder, file.getName()))) {
+            logger.error("Failed to move {} to upgrade folder", file);
+          }
+        }
+
+        Collections.addAll(upgradeFiles,
+            fsFactory.listFilesBySuffix(upgradeFolder.getAbsolutePath(), TSFILE_SUFFIX));
+      }
+      // if already move old files to upgradeFolder 
+      else if (upgradeFolder.exists()) {
+        Collections.addAll(upgradeFiles,
+            fsFactory.listFilesBySuffix(upgradeFolder.getAbsolutePath(), TSFILE_SUFFIX));
+      }
+
       File[] subFiles = fileFolder.listFiles();
       if (subFiles != null) {
         for (File partitionFolder : subFiles) {
-          // some TsFileResource may be being persisted when the system crashed, try recovering such
-          // resources
-          continueFailedRenames(partitionFolder, TEMP_SUFFIX);
-
-          // some TsFiles were going to be replaced by the merged files when the system crashed and
-          // the process was interrupted before the merged files could be named
-          continueFailedRenames(partitionFolder, MERGE_SUFFIX);
-
-        if (!partitionFolder.isDirectory()) {
-          logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
-          continue;
-        }
-
-        Collections.addAll(tsFiles,
-            fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
+          if (!partitionFolder.isDirectory()) {
+            logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());

Review comment:
       check if it is end with .tsfile, do not warning

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();
+
+    // ChunkGroupOffset -> time partition, record the offsets of chunk group that data are in same partition
+    Map<Long, Long> chunkGroupTimePartitionInfo = new HashMap<>();
+
+    // scan metadata to get version Info and chunkGroupTimePartitionInfo
+    scanMetadata(oldVersionInfo, chunkGroupTimePartitionInfo);
+    
+    // start to scan chunks and chunkGroups
+    long startOffsetOfChunkGroup = 0;
+    boolean newChunkGroup = true;
+    long versionOfChunkGroup = 0;
+    boolean chunkGroupInSamePartition = false;
+    List<ChunkGroupMetadata> newMetaData = new ArrayList<>();
+    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
+    List<List<ByteBuffer>> dataInChunkGroup = new ArrayList<>();
+    byte marker;
+    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+    try {
+      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            // this is the first chunk of a new ChunkGroup.
+            if (newChunkGroup) {
+              newChunkGroup = false;
+              startOffsetOfChunkGroup = this.position() - 1;
+              versionOfChunkGroup = oldVersionInfo.get(startOffsetOfChunkGroup);
+              chunkGroupInSamePartition = chunkGroupTimePartitionInfo
+                  .containsKey(startOffsetOfChunkGroup);
+            }
+            ChunkHeader header = this.readChunkHeader();
+            MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
+                header.getDataType(),
+                header.getEncodingType(), 
+                header.getCompressionType());
+            measurementSchemaList.add(measurementSchema);
+            List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+            List<ByteBuffer> dataInChunk = new ArrayList<>();
+            for (int j = 0; j < header.getNumOfPages(); j++) {
+              PageHeader pageHeader = readPageHeader(header.getDataType());
+              ByteBuffer pageData = chunkGroupInSamePartition ? 
+                  readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+              pageHeadersInChunk.add(pageHeader);
+              dataInChunk.add(pageData);
+            }
+            pageHeadersInChunkGroup.add(pageHeadersInChunk);
+            dataInChunkGroup.add(dataInChunk);
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            // this is the footer of a ChunkGroup.
+            ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+            String deviceID = chunkGroupFooter.getDeviceID();
+            if (chunkGroupInSamePartition) {
+              quickRewrite(oldTsFile, deviceID, measurementSchemaList, pageHeadersInChunkGroup,
+                  dataInChunkGroup, versionOfChunkGroup, chunkGroupTimePartitionInfo.get(startOffsetOfChunkGroup));
+            } else {
+              rewrite(oldTsFile, deviceID, measurementSchemaList, 
+                dataInChunkGroup, versionOfChunkGroup);
+            }
+
+            pageHeadersInChunkGroup.clear();
+            dataInChunkGroup.clear();
+            measurementSchemaList.clear();
+            newChunkGroup = true;
+            break;
+
+          default:
+            // the disk file is corrupted, using this file may be dangerous
+            logger.error("Unrecognized marker detected, this file may be corrupted");
+            return false;
+        }
+      }
+      // close upgraded tsFiles and generate resources for them
+      for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
+        upgradedResources.add(endFileAndGenerateResource(tsFileIOWriter));
+      }
+      return true;
+    } catch (IOException e2) {
+      logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+          + "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
+      return false;
+    } finally {
+      if (tsFileInput != null) {
+        tsFileInput.close();
+      }
+    }
+  }
+
+  /**
+   *  Rewrite the chunk group to new TsFile.
+   *  If data of this chunk group are in different time partitions,
+   *  create multiple new TsFiles and rewrite data in each partition.
+   */
+  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 
+      List<List<ByteBuffer>> dataInChunkGroup, long versionOfChunkGroup) 
+          throws IOException {
+
+    Map<Long, Map<MeasurementSchema, IChunkWriter>> chunkWritersInChunkGroup = new HashMap<>();
+    for (int i = 0; i < schemas.size(); i++) {
+      MeasurementSchema schema = schemas.get(i);
+      Decoder defaultTimeDecoder = Decoder.getDecoderByType(
+          TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+          TSDataType.INT64);
+      Decoder valueDecoder = Decoder
+          .getDecoderByType(schema.getEncodingType(), schema.getType());
+      List<ByteBuffer> dataInChunk = dataInChunkGroup.get(i);
+      for (ByteBuffer pageData : dataInChunk) {
+        valueDecoder.reset();
+        PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder,
+            defaultTimeDecoder, null);
+        BatchData batchData = pageReader.getAllSatisfiedPageData();
+        while (batchData.hasCurrent()) {
+          long time = batchData.currentTime();
+          Object value = batchData.currentValue();
+          long partition = StorageEngine.getTimePartition(time);
+          
+          Map<MeasurementSchema, IChunkWriter> chunkWriters = chunkWritersInChunkGroup.getOrDefault(partition, new HashMap<>());
+          IChunkWriter chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+          TsFileIOWriter tsFileIOWriter = getOrDefaultTsFileIOWriter(oldTsFile, partition);
+          partitionWriterMap.put(partition, tsFileIOWriter);

Review comment:
       no need to put again.  if the writer is newly created, it will be put into the map and then get.

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();
+
+    // ChunkGroupOffset -> time partition, record the offsets of chunk group that data are in same partition
+    Map<Long, Long> chunkGroupTimePartitionInfo = new HashMap<>();
+
+    // scan metadata to get version Info and chunkGroupTimePartitionInfo
+    scanMetadata(oldVersionInfo, chunkGroupTimePartitionInfo);
+    
+    // start to scan chunks and chunkGroups
+    long startOffsetOfChunkGroup = 0;
+    boolean newChunkGroup = true;
+    long versionOfChunkGroup = 0;
+    boolean chunkGroupInSamePartition = false;
+    List<ChunkGroupMetadata> newMetaData = new ArrayList<>();
+    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
+    List<List<ByteBuffer>> dataInChunkGroup = new ArrayList<>();
+    byte marker;
+    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+    try {
+      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            // this is the first chunk of a new ChunkGroup.
+            if (newChunkGroup) {
+              newChunkGroup = false;
+              startOffsetOfChunkGroup = this.position() - 1;
+              versionOfChunkGroup = oldVersionInfo.get(startOffsetOfChunkGroup);
+              chunkGroupInSamePartition = chunkGroupTimePartitionInfo
+                  .containsKey(startOffsetOfChunkGroup);
+            }
+            ChunkHeader header = this.readChunkHeader();
+            MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
+                header.getDataType(),
+                header.getEncodingType(), 
+                header.getCompressionType());
+            measurementSchemaList.add(measurementSchema);
+            List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+            List<ByteBuffer> dataInChunk = new ArrayList<>();
+            for (int j = 0; j < header.getNumOfPages(); j++) {
+              PageHeader pageHeader = readPageHeader(header.getDataType());
+              ByteBuffer pageData = chunkGroupInSamePartition ? 
+                  readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+              pageHeadersInChunk.add(pageHeader);
+              dataInChunk.add(pageData);
+            }
+            pageHeadersInChunkGroup.add(pageHeadersInChunk);
+            dataInChunkGroup.add(dataInChunk);
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            // this is the footer of a ChunkGroup.
+            ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+            String deviceID = chunkGroupFooter.getDeviceID();
+            if (chunkGroupInSamePartition) {
+              quickRewrite(oldTsFile, deviceID, measurementSchemaList, pageHeadersInChunkGroup,
+                  dataInChunkGroup, versionOfChunkGroup, chunkGroupTimePartitionInfo.get(startOffsetOfChunkGroup));
+            } else {
+              rewrite(oldTsFile, deviceID, measurementSchemaList, 
+                dataInChunkGroup, versionOfChunkGroup);
+            }
+
+            pageHeadersInChunkGroup.clear();
+            dataInChunkGroup.clear();
+            measurementSchemaList.clear();
+            newChunkGroup = true;
+            break;
+
+          default:
+            // the disk file is corrupted, using this file may be dangerous
+            logger.error("Unrecognized marker detected, this file may be corrupted");
+            return false;
+        }
+      }
+      // close upgraded tsFiles and generate resources for them
+      for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
+        upgradedResources.add(endFileAndGenerateResource(tsFileIOWriter));
+      }
+      return true;
+    } catch (IOException e2) {
+      logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+          + "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
+      return false;
+    } finally {
+      if (tsFileInput != null) {
+        tsFileInput.close();
+      }
+    }
+  }
+
+  /**
+   *  Rewrite the chunk group to new TsFile.
+   *  If data of this chunk group are in different time partitions,
+   *  create multiple new TsFiles and rewrite data in each partition.
+   */
+  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 
+      List<List<ByteBuffer>> dataInChunkGroup, long versionOfChunkGroup) 
+          throws IOException {
+
+    Map<Long, Map<MeasurementSchema, IChunkWriter>> chunkWritersInChunkGroup = new HashMap<>();
+    for (int i = 0; i < schemas.size(); i++) {
+      MeasurementSchema schema = schemas.get(i);
+      Decoder defaultTimeDecoder = Decoder.getDecoderByType(
+          TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+          TSDataType.INT64);
+      Decoder valueDecoder = Decoder
+          .getDecoderByType(schema.getEncodingType(), schema.getType());
+      List<ByteBuffer> dataInChunk = dataInChunkGroup.get(i);
+      for (ByteBuffer pageData : dataInChunk) {
+        valueDecoder.reset();
+        PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder,
+            defaultTimeDecoder, null);
+        BatchData batchData = pageReader.getAllSatisfiedPageData();
+        while (batchData.hasCurrent()) {
+          long time = batchData.currentTime();
+          Object value = batchData.currentValue();
+          long partition = StorageEngine.getTimePartition(time);
+          
+          Map<MeasurementSchema, IChunkWriter> chunkWriters = chunkWritersInChunkGroup.getOrDefault(partition, new HashMap<>());
+          IChunkWriter chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+          TsFileIOWriter tsFileIOWriter = getOrDefaultTsFileIOWriter(oldTsFile, partition);
+          partitionWriterMap.put(partition, tsFileIOWriter);
+          switch (schema.getType()) {
+            case INT32:
+              chunkWriter.write(time, (int) value);
+              break;
+            case INT64:
+              chunkWriter.write(time, (long) value);
+              break;
+            case FLOAT:
+              chunkWriter.write(time, (float) value);
+              break;
+            case DOUBLE:
+              chunkWriter.write(time, (double) value);
+              break;
+            case BOOLEAN:
+              chunkWriter.write(time, (boolean) value);
+              break;
+            case TEXT:
+              chunkWriter.write(time, (Binary) value);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", schema.getType()));
+            }
+          chunkWriters.put(schema, chunkWriter);
+          chunkWritersInChunkGroup.put(partition, chunkWriters);
+          batchData.next();
+        }
+      }
+    }
+    // set version info to each upgraded tsFile 
+    for (Entry<Long, Map<MeasurementSchema, IChunkWriter>> entry : chunkWritersInChunkGroup.entrySet()) {
+      long partition = entry.getKey();
+      TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partition);
+      tsFileIOWriter.startChunkGroup(deviceId);
+      for (IChunkWriter chunkWriter : entry.getValue().values()) {
+        chunkWriter.writeToFileWriter(tsFileIOWriter);
+      }
+      tsFileIOWriter.endChunkGroup();
+      tsFileIOWriter.writeVersion(versionOfChunkGroup);
+    }
+  }
+
+  /**
+   * 
+   * @param oldTsFile
+   * @param deviceId
+   * @param schemas
+   * @param pageHeadersInChunkGroup
+   * @param dataInChunkGroup
+   * @param versionOfChunkGroup
+   * @param partition
+   * @throws IOException
+   * @throws PageException
+   */
+  private void quickRewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 

Review comment:
       add javadoc or rename to rewriteCompressedPage

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();
+
+    // ChunkGroupOffset -> time partition, record the offsets of chunk group that data are in same partition
+    Map<Long, Long> chunkGroupTimePartitionInfo = new HashMap<>();
+
+    // scan metadata to get version Info and chunkGroupTimePartitionInfo
+    scanMetadata(oldVersionInfo, chunkGroupTimePartitionInfo);
+    
+    // start to scan chunks and chunkGroups
+    long startOffsetOfChunkGroup = 0;
+    boolean newChunkGroup = true;
+    long versionOfChunkGroup = 0;
+    boolean chunkGroupInSamePartition = false;
+    List<ChunkGroupMetadata> newMetaData = new ArrayList<>();
+    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
+    List<List<ByteBuffer>> dataInChunkGroup = new ArrayList<>();
+    byte marker;
+    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+    try {
+      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            // this is the first chunk of a new ChunkGroup.
+            if (newChunkGroup) {
+              newChunkGroup = false;
+              startOffsetOfChunkGroup = this.position() - 1;
+              versionOfChunkGroup = oldVersionInfo.get(startOffsetOfChunkGroup);
+              chunkGroupInSamePartition = chunkGroupTimePartitionInfo
+                  .containsKey(startOffsetOfChunkGroup);
+            }
+            ChunkHeader header = this.readChunkHeader();
+            MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
+                header.getDataType(),
+                header.getEncodingType(), 
+                header.getCompressionType());
+            measurementSchemaList.add(measurementSchema);
+            List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+            List<ByteBuffer> dataInChunk = new ArrayList<>();
+            for (int j = 0; j < header.getNumOfPages(); j++) {
+              PageHeader pageHeader = readPageHeader(header.getDataType());
+              ByteBuffer pageData = chunkGroupInSamePartition ? 
+                  readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+              pageHeadersInChunk.add(pageHeader);
+              dataInChunk.add(pageData);
+            }
+            pageHeadersInChunkGroup.add(pageHeadersInChunk);
+            dataInChunkGroup.add(dataInChunk);
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            // this is the footer of a ChunkGroup.
+            ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+            String deviceID = chunkGroupFooter.getDeviceID();
+            if (chunkGroupInSamePartition) {
+              quickRewrite(oldTsFile, deviceID, measurementSchemaList, pageHeadersInChunkGroup,
+                  dataInChunkGroup, versionOfChunkGroup, chunkGroupTimePartitionInfo.get(startOffsetOfChunkGroup));
+            } else {
+              rewrite(oldTsFile, deviceID, measurementSchemaList, 
+                dataInChunkGroup, versionOfChunkGroup);
+            }
+
+            pageHeadersInChunkGroup.clear();
+            dataInChunkGroup.clear();
+            measurementSchemaList.clear();
+            newChunkGroup = true;
+            break;
+
+          default:
+            // the disk file is corrupted, using this file may be dangerous
+            logger.error("Unrecognized marker detected, this file may be corrupted");
+            return false;
+        }
+      }
+      // close upgraded tsFiles and generate resources for them
+      for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
+        upgradedResources.add(endFileAndGenerateResource(tsFileIOWriter));
+      }
+      return true;
+    } catch (IOException e2) {
+      logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+          + "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
+      return false;
+    } finally {
+      if (tsFileInput != null) {
+        tsFileInput.close();
+      }
+    }
+  }
+
+  /**
+   *  Rewrite the chunk group to new TsFile.
+   *  If data of this chunk group are in different time partitions,
+   *  create multiple new TsFiles and rewrite data in each partition.
+   */
+  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 
+      List<List<ByteBuffer>> dataInChunkGroup, long versionOfChunkGroup) 
+          throws IOException {
+
+    Map<Long, Map<MeasurementSchema, IChunkWriter>> chunkWritersInChunkGroup = new HashMap<>();
+    for (int i = 0; i < schemas.size(); i++) {
+      MeasurementSchema schema = schemas.get(i);
+      Decoder defaultTimeDecoder = Decoder.getDecoderByType(
+          TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+          TSDataType.INT64);
+      Decoder valueDecoder = Decoder
+          .getDecoderByType(schema.getEncodingType(), schema.getType());
+      List<ByteBuffer> dataInChunk = dataInChunkGroup.get(i);
+      for (ByteBuffer pageData : dataInChunk) {
+        valueDecoder.reset();
+        PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder,
+            defaultTimeDecoder, null);
+        BatchData batchData = pageReader.getAllSatisfiedPageData();
+        while (batchData.hasCurrent()) {
+          long time = batchData.currentTime();
+          Object value = batchData.currentValue();
+          long partition = StorageEngine.getTimePartition(time);
+          
+          Map<MeasurementSchema, IChunkWriter> chunkWriters = chunkWritersInChunkGroup.getOrDefault(partition, new HashMap<>());
+          IChunkWriter chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+          TsFileIOWriter tsFileIOWriter = getOrDefaultTsFileIOWriter(oldTsFile, partition);
+          partitionWriterMap.put(partition, tsFileIOWriter);
+          switch (schema.getType()) {
+            case INT32:
+              chunkWriter.write(time, (int) value);
+              break;
+            case INT64:
+              chunkWriter.write(time, (long) value);
+              break;
+            case FLOAT:
+              chunkWriter.write(time, (float) value);
+              break;
+            case DOUBLE:
+              chunkWriter.write(time, (double) value);
+              break;
+            case BOOLEAN:
+              chunkWriter.write(time, (boolean) value);
+              break;
+            case TEXT:
+              chunkWriter.write(time, (Binary) value);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", schema.getType()));
+            }
+          chunkWriters.put(schema, chunkWriter);
+          chunkWritersInChunkGroup.put(partition, chunkWriters);

Review comment:
       no need to put again

##########
File path: server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsfileOnlineUpgradeTool.java
##########
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.OldChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.OldTsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.OldTsFileMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.LocalTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class TsfileOnlineUpgradeTool implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileOnlineUpgradeTool.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+  
+  // PartitionId -> TsFileIOWriter 
+  private Map<Long, TsFileIOWriter> partitionWriterMap;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileOnlineUpgradeTool(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileOnlineUpgradeTool.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileOnlineUpgradeTool(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final java.nio.file.Path path = Paths.get(file);
+    tsFileInput = new LocalTsFileInput(path);
+    partitionWriterMap = new HashMap<>();
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize();
+      }
+    } catch (Exception e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * 
+   */
+  public void loadMetadataSize() throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    tsFileInput.read(metadataSize,
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+    metadataSize.flip();
+    // read file metadata size and position
+    fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+    fileMetadataPos =
+        tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+            - fileMetadataSize;
+    // skip the magic header
+    position(TSFileConfig.MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function reads version number and checks compatibility of TsFile.
+   */
+  public String readVersionNumber() throws IOException {
+    ByteBuffer versionNumberBytes = ByteBuffer
+        .allocate(TSFileConfig.VERSION_NUMBER.getBytes().length);
+    tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes().length);
+    tsFileInput.read(versionNumberBytes);
+    versionNumberBytes.flip();
+    return new String(versionNumberBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsFileMetadata readFileMetadata() throws IOException {
+    return OldTsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public OldTsDeviceMetadata readTsDeviceMetaData(OldTsDeviceMetadataIndex index) throws IOException {
+    return OldTsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true, true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, true);
+  }
+
+  public ByteBuffer readPage(PageHeader header, CompressionType type)
+      throws IOException {
+    ByteBuffer buffer = readData(-1, header.getCompressedSize());
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
+    if (type == CompressionType.UNCOMPRESSED) {
+      return buffer;
+    }
+    unCompressor.uncompress(buffer.array(), buffer.position(), buffer.remaining(),
+        uncompressedBuffer.array(),
+        0);
+    return uncompressedBuffer;
+  }
+  
+  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
+    return readData(-1, header.getCompressedSize());
+  }
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public byte readMarker(long position) throws IOException {
+    return readData(position, Byte.BYTES).get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and resource, return the boolean value whether upgrade task completes
+   * @throws IOException, WriteProcessException 
+   */
+  public boolean upgradeFile(List<TsFileResource> upgradedResources) 
+      throws IOException, WriteProcessException {
+    File oldTsFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+
+    // check if the old TsFile has correct header 
+    if (!fileCheck(oldTsFile)) {
+      return false;
+    }
+
+    // ChunkGroupOffset -> version
+    Map<Long, Long> oldVersionInfo = new HashMap<>();
+
+    // ChunkGroupOffset -> time partition, record the offsets of chunk group that data are in same partition
+    Map<Long, Long> chunkGroupTimePartitionInfo = new HashMap<>();
+
+    // scan metadata to get version Info and chunkGroupTimePartitionInfo
+    scanMetadata(oldVersionInfo, chunkGroupTimePartitionInfo);
+    
+    // start to scan chunks and chunkGroups
+    long startOffsetOfChunkGroup = 0;
+    boolean newChunkGroup = true;
+    long versionOfChunkGroup = 0;
+    boolean chunkGroupInSamePartition = false;
+    List<ChunkGroupMetadata> newMetaData = new ArrayList<>();
+    List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
+    List<List<ByteBuffer>> dataInChunkGroup = new ArrayList<>();
+    byte marker;
+    List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+    try {
+      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            // this is the first chunk of a new ChunkGroup.
+            if (newChunkGroup) {
+              newChunkGroup = false;
+              startOffsetOfChunkGroup = this.position() - 1;
+              versionOfChunkGroup = oldVersionInfo.get(startOffsetOfChunkGroup);
+              chunkGroupInSamePartition = chunkGroupTimePartitionInfo
+                  .containsKey(startOffsetOfChunkGroup);
+            }
+            ChunkHeader header = this.readChunkHeader();
+            MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
+                header.getDataType(),
+                header.getEncodingType(), 
+                header.getCompressionType());
+            measurementSchemaList.add(measurementSchema);
+            List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+            List<ByteBuffer> dataInChunk = new ArrayList<>();
+            for (int j = 0; j < header.getNumOfPages(); j++) {
+              PageHeader pageHeader = readPageHeader(header.getDataType());
+              ByteBuffer pageData = chunkGroupInSamePartition ? 
+                  readCompressedPage(pageHeader) : readPage(pageHeader, header.getCompressionType());
+              pageHeadersInChunk.add(pageHeader);
+              dataInChunk.add(pageData);
+            }
+            pageHeadersInChunkGroup.add(pageHeadersInChunk);
+            dataInChunkGroup.add(dataInChunk);
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            // this is the footer of a ChunkGroup.
+            ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+            String deviceID = chunkGroupFooter.getDeviceID();
+            if (chunkGroupInSamePartition) {
+              quickRewrite(oldTsFile, deviceID, measurementSchemaList, pageHeadersInChunkGroup,
+                  dataInChunkGroup, versionOfChunkGroup, chunkGroupTimePartitionInfo.get(startOffsetOfChunkGroup));
+            } else {
+              rewrite(oldTsFile, deviceID, measurementSchemaList, 
+                dataInChunkGroup, versionOfChunkGroup);
+            }
+
+            pageHeadersInChunkGroup.clear();
+            dataInChunkGroup.clear();
+            measurementSchemaList.clear();
+            newChunkGroup = true;
+            break;
+
+          default:
+            // the disk file is corrupted, using this file may be dangerous
+            logger.error("Unrecognized marker detected, this file may be corrupted");
+            return false;
+        }
+      }
+      // close upgraded tsFiles and generate resources for them
+      for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
+        upgradedResources.add(endFileAndGenerateResource(tsFileIOWriter));
+      }
+      return true;
+    } catch (IOException e2) {
+      logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+          + "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
+      return false;
+    } finally {
+      if (tsFileInput != null) {
+        tsFileInput.close();
+      }
+    }
+  }
+
+  /**
+   *  Rewrite the chunk group to new TsFile.
+   *  If data of this chunk group are in different time partitions,
+   *  create multiple new TsFiles and rewrite data in each partition.
+   */
+  private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas, 

Review comment:
       add javadoc or rename to rewriteUnCompressedPage




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org