You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/03/31 02:53:15 UTC

[incubator-iotdb] 01/01: add the old upgrade tool back

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch upgrade_tool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 5b9fb3f47ba69ecd5cbd13bcb944518db18b62ee
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Mar 31 10:52:18 2020 +0800

    add the old upgrade tool back
---
 .../iotdb/db/tools/upgrade/OfflineUpgradeTool.java |  68 +++
 .../tool/upgrade/TsfileUpgradeToolV0_8_0.java      | 543 +++++++++++++++++++++
 .../iotdb/tsfile/tool/upgrade/UpgradeTool.java     | 108 ++++
 3 files changed, 719 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/OfflineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/OfflineUpgradeTool.java
new file mode 100644
index 0000000..c63dfd6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/OfflineUpgradeTool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.tool.upgrade.UpgradeTool;
+
+public class OfflineUpgradeTool {
+
+  private static List<String> oldVersionTsfileDirs = new ArrayList<>();
+  private static List<String> newVersionTsfileDirs = new ArrayList<>();
+  private static int upgradeThreadNum;
+
+  private static void loadProps(String configPath) {
+    InputStream inputStream = null;
+    try {
+      inputStream = new FileInputStream(FSFactoryProducer.getFSFactory().getFile(configPath));
+    } catch (FileNotFoundException e) {
+      System.out.println(String.format("Fail to find config file:%s", configPath));
+      e.printStackTrace();
+      System.exit(1);
+    }
+    Properties properties = new Properties();
+    try {
+      properties.load(inputStream);
+      String oldVersionTsfileDirString = properties.getProperty("old_version_data_dirs");
+      Collections.addAll(oldVersionTsfileDirs, oldVersionTsfileDirString.split(","));
+      String newVersionTsfileDirString = properties.getProperty("new_version_data_dirs");
+      Collections.addAll(newVersionTsfileDirs, newVersionTsfileDirString.split(","));
+      upgradeThreadNum = Integer.parseInt(properties.getProperty("upgrade_thread_num"));
+    } catch (IOException e) {
+      System.out.println("Cannot load config file ");
+      e.printStackTrace();
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    loadProps(args[0]);
+    for (int i = 0; i < oldVersionTsfileDirs.size(); i++) {
+      UpgradeTool.upgradeTsfiles(oldVersionTsfileDirs.get(i), newVersionTsfileDirs.get(i),
+          upgradeThreadNum);
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/TsfileUpgradeToolV0_8_0.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/TsfileUpgradeToolV0_8_0.java
new file mode 100644
index 0000000..328dc3a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/TsfileUpgradeToolV0_8_0.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.tool.upgrade;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.compress.ICompressor.SnappyCompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor.SnappyUnCompressor;
+import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TsfileUpgradeToolV0_8_0 implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(TsfileUpgradeToolV0_8_0.class);
+
+  private TsFileInput tsFileInput;
+  private long fileMetadataPos;
+  private int fileMetadataSize;
+  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+  protected String file;
+
+  /**
+   * Create a file reader of the given file. The reader will read the tail of the file to get the
+   * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+   * bytes of the file for preparing reading real data.
+   *
+   * @param file the data file
+   * @throws IOException If some I/O error occurs
+   */
+  public TsfileUpgradeToolV0_8_0(String file) throws IOException {
+    this(file, true);
+  }
+
+  /**
+   * construct function for TsfileUpgradeToolV0_8_0.
+   *
+   * @param file -given file name
+   * @param loadMetadataSize -load meta data size
+   */
+  public TsfileUpgradeToolV0_8_0(String file, boolean loadMetadataSize) throws IOException {
+    this.file = file;
+    final Path path = Paths.get(file);
+    tsFileInput = new DefaultTsFileInput(path);
+    try {
+      if (loadMetadataSize) {
+        loadMetadataSize(false);
+      }
+    } catch (Throwable e) {
+      tsFileInput.close();
+      throw e;
+    }
+  }
+
+  /**
+   * @param sealedWithNewMagic true when an old version tsfile sealed with new version MAGIC_STRING
+   */
+  public void loadMetadataSize(boolean sealedWithNewMagic) throws IOException {
+    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+    if (sealedWithNewMagic) {
+      tsFileInput.read(metadataSize,
+          tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+      metadataSize.flip();
+      // read file metadata size and position
+      fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+      fileMetadataPos =
+          tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+              - fileMetadataSize;
+    } else {
+      tsFileInput.read(metadataSize,
+          tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.length() - Integer.BYTES);
+      metadataSize.flip();
+      // read file metadata size and position
+      fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+      fileMetadataPos = tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.length() - Integer.BYTES
+          - fileMetadataSize;
+    }
+    // skip the magic header
+    position(TSFileConfig.OLD_MAGIC_STRING.length());
+  }
+
+  public String readTailMagic() throws IOException {
+    long totalSize = tsFileInput.size();
+
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.OLD_MAGIC_STRING.length());
+    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.OLD_MAGIC_STRING.length());
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+   */
+  public boolean isComplete() throws IOException {
+    return tsFileInput.size() >= TSFileConfig.OLD_MAGIC_STRING.length() * 2 && readTailMagic()
+        .equals(readHeadMagic());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public String readHeadMagic() throws IOException {
+    return readHeadMagic(false);
+  }
+
+  /**
+   * @param movePosition whether move the position of the file reader after reading the magic header
+   * to the end of the magic head string.
+   */
+  public String readHeadMagic(boolean movePosition) throws IOException {
+    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.OLD_MAGIC_STRING.length());
+    if (movePosition) {
+      tsFileInput.position(0);
+      tsFileInput.read(magicStringBytes);
+    } else {
+      tsFileInput.read(magicStringBytes, 0);
+    }
+    magicStringBytes.flip();
+    return new String(magicStringBytes.array());
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public TsFileMetaData readFileMetadata() throws IOException {
+    ByteBuffer buffer = readData(fileMetadataPos, fileMetadataSize);
+    TsFileMetaData fileMetaData = new TsFileMetaData();
+
+    int size = ReadWriteIOUtils.readInt(buffer);
+    if (size > 0) {
+      Map<String, TsDeviceMetadataIndex> deviceMap = new HashMap<>();
+      String key;
+      TsDeviceMetadataIndex value;
+      for (int i = 0; i < size; i++) {
+        key = ReadWriteIOUtils.readString(buffer);
+        value = TsDeviceMetadataIndex.deserializeFrom(buffer);
+        deviceMap.put(key, value);
+      }
+      fileMetaData.setDeviceIndexMap(deviceMap);
+    }
+
+    size = ReadWriteIOUtils.readInt(buffer);
+    if (size > 0) {
+      fileMetaData.setMeasurementSchema(new HashMap<>());
+      String key;
+      MeasurementSchema value;
+      for (int i = 0; i < size; i++) {
+        key = ReadWriteIOUtils.readString(buffer);
+        value = MeasurementSchema.deserializeFrom(buffer);
+        fileMetaData.getMeasurementSchema().put(key, value);
+      }
+    }
+    // skip the current version of file metadata
+    ReadWriteIOUtils.readInt(buffer);
+
+    if (ReadWriteIOUtils.readIsNull(buffer)) {
+      fileMetaData.setCreatedBy(ReadWriteIOUtils.readString(buffer));
+    }
+
+    return fileMetaData;
+  }
+
+  /**
+   * this function does not modify the position of the file reader.
+   */
+  public TsDeviceMetadata readTsDeviceMetaData(TsDeviceMetadataIndex index) throws IOException {
+    return TsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+   * This method is not threadsafe.
+   *
+   * @return a CHUNK_GROUP_FOOTER
+   * @throws IOException io error
+   */
+  public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+    return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+   * method is not threadsafe.
+   *
+   * @return a CHUNK_HEADER
+   * @throws IOException io error
+   */
+  public ChunkHeader readChunkHeader() throws IOException {
+    return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+  }
+
+  /**
+   * not thread safe.
+   *
+   * @param type given tsfile data type
+   */
+  public PageHeader readPageHeader(TSDataType type) throws IOException {
+    return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
+  }
+
+
+  public long position() throws IOException {
+    return tsFileInput.position();
+  }
+
+  public void position(long offset) throws IOException {
+    tsFileInput.position(offset);
+  }
+
+  /**
+   * read one byte from the input. <br> this method is not thread safe
+   */
+  public byte readMarker() throws IOException {
+    markerBuffer.clear();
+    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+      throw new IOException("reach the end of the file.");
+    }
+    markerBuffer.flip();
+    return markerBuffer.get();
+  }
+
+  public void close() throws IOException {
+    this.tsFileInput.close();
+  }
+
+  public String getFileName() {
+    return this.file;
+  }
+
+  /**
+   * read data from tsFileInput, from the current position (if position = -1), or the given
+   * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+   * position + real data size that been read. Other wise, the tsFileInput's position is not
+   * changed.
+   *
+   * @param position the start position of data in the tsFileInput, or the current position if
+   * position = -1
+   * @param size the size of data that want to read
+   * @return data that been read.
+   */
+  private ByteBuffer readData(long position, int size) throws IOException {
+    ByteBuffer buffer = ByteBuffer.allocate(size);
+    if (position == -1) {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    } else {
+      if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+        throw new IOException("reach the end of the data");
+      }
+    }
+    buffer.flip();
+    return buffer;
+  }
+
+  /**
+   * upgrade file and return the boolean value whether upgrade task completes
+   */
+  public boolean upgradeFile(String updateFileName) throws IOException {
+    File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+    long fileSize;
+    if (!checkFile.exists()) {
+      logger.error("the file to be updated does not exist, file path: {}", checkFile.getPath());
+      return false;
+    } else {
+      fileSize = checkFile.length();
+    }
+    File upgradeFile = FSFactoryProducer.getFSFactory().getFile(updateFileName);
+    if (!upgradeFile.getParentFile().exists()) {
+      upgradeFile.getParentFile().mkdirs();
+    }
+    upgradeFile.createNewFile();
+    TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(upgradeFile);
+
+    List<ChunkHeader> chunkHeaders = new ArrayList<>();
+    List<List<PageHeader>> pageHeadersList = new ArrayList<>();
+    List<List<ByteBuffer>> pagesList = new ArrayList<>();
+    Schema schema = null;
+
+    String magic = readHeadMagic(true);
+    if (!magic.equals(TSFileConfig.OLD_MAGIC_STRING)) {
+      logger.error("the file's MAGIC STRING is incorrect, file path: {}", checkFile.getPath());
+      return false;
+    }
+
+    if (fileSize == TSFileConfig.OLD_MAGIC_STRING.length()) {
+      logger.error("the file only contains magic string, file path: {}", checkFile.getPath());
+      return false;
+    } else if (readTailMagic().equals(TSFileConfig.OLD_MAGIC_STRING)) {
+      loadMetadataSize(false);
+      TsFileMetaData tsFileMetaData = readFileMetadata();
+      schema = new Schema(tsFileMetaData.getMeasurementSchema());
+    } else {
+      loadMetadataSize(true);
+      TsFileMetaData tsFileMetaData = readFileMetadata();
+      schema = new Schema(tsFileMetaData.getMeasurementSchema());
+    }
+
+    ChunkMetaData currentChunkMetaData;
+    List<ChunkMetaData> chunkMetaDataList = null;
+    long startOffsetOfChunkGroup = 0;
+    boolean newChunkGroup = true;
+    long versionOfChunkGroup = 0;
+    List<ChunkGroupMetaData> newMetaData = new ArrayList<>();
+    List<Statistics<?>> chunkStatisticsList = new ArrayList<>();
+
+    boolean goon = true;
+    byte marker;
+    try {
+      while (goon && (marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+            // this is the first chunk of a new ChunkGroup.
+            if (newChunkGroup) {
+              newChunkGroup = false;
+              chunkMetaDataList = new ArrayList<>();
+              startOffsetOfChunkGroup = this.position() - 1;
+            }
+
+            long fileOffsetOfChunk = this.position() - 1;
+            ChunkHeader header = this.readChunkHeader();
+            chunkHeaders.add(header);
+            List<PageHeader> pageHeaders = new ArrayList<>();
+            List<ByteBuffer> pages = new ArrayList<>();
+            TSDataType dataType = header.getDataType();
+            Statistics<?> chunkStatistics = Statistics.getStatsByType(dataType);
+            chunkStatisticsList.add(chunkStatistics);
+            if (header.getNumOfPages() > 0) {
+              PageHeader pageHeader = this.readPageHeader(header.getDataType());
+              pageHeaders.add(pageHeader);
+              chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+              pages.add(readData(-1, pageHeader.getCompressedSize()));
+            }
+            for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+              PageHeader pageHeader = this.readPageHeader(header.getDataType());
+              pageHeaders.add(pageHeader);
+              chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+              pages.add(readData(-1, pageHeader.getCompressedSize()));
+            }
+            if (header.getNumOfPages() > 1) {
+              PageHeader pageHeader = this.readPageHeader(header.getDataType());
+              pageHeaders.add(pageHeader);
+              chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+              pages.add(readData(-1, pageHeader.getCompressedSize()));
+            }
+
+            currentChunkMetaData = new ChunkMetaData(header.getMeasurementID(), dataType,
+                fileOffsetOfChunk, chunkStatistics);
+            chunkMetaDataList.add(currentChunkMetaData);
+            pageHeadersList.add(pageHeaders);
+            pagesList.add(pages);
+            break;
+          case MetaMarker.CHUNK_GROUP_FOOTER:
+            ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+            String deviceID = chunkGroupFooter.getDeviceID();
+            long endOffsetOfChunkGroup = this.position();
+            ChunkGroupMetaData currentChunkGroup = new ChunkGroupMetaData(deviceID,
+                chunkMetaDataList,
+                startOffsetOfChunkGroup);
+            currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
+            currentChunkGroup.setVersion(versionOfChunkGroup++);
+            newMetaData.add(currentChunkGroup);
+            tsFileIOWriter.startChunkGroup(deviceID);
+            for (int i = 0; i < chunkHeaders.size(); i++) {
+              TSDataType tsDataType = chunkHeaders.get(i).getDataType();
+              TSEncoding encodingType = chunkHeaders.get(i).getEncodingType();
+              CompressionType compressionType = chunkHeaders.get(i).getCompressionType();
+              ChunkHeader chunkHeader = chunkHeaders.get(i);
+              List<PageHeader> pageHeaderList = pageHeadersList.get(i);
+              List<ByteBuffer> pageList = pagesList.get(i);
+
+              if (schema.getMeasurementSchema(chunkHeader.getMeasurementID()) != null) {
+                ChunkWriterImpl chunkWriter = new ChunkWriterImpl(
+                    schema.getMeasurementSchema(chunkHeader.getMeasurementID()));
+                for (int j = 0; j < pageHeaderList.size(); j++) {
+                  if (encodingType.equals(TSEncoding.PLAIN)) {
+                    pageList.set(j, rewrite(pageList.get(j), tsDataType, compressionType,
+                        pageHeaderList.get(j)));
+                  }
+                  chunkWriter
+                      .writePageHeaderAndDataIntoBuff(pageList.get(j), pageHeaderList.get(j));
+                }
+                chunkWriter
+                    .writeAllPagesOfChunkToTsFile(tsFileIOWriter, chunkStatisticsList.get(i));
+              }
+            }
+            tsFileIOWriter.endChunkGroup(currentChunkGroup.getVersion());
+            chunkStatisticsList.clear();
+            chunkHeaders.clear();
+            pageHeadersList.clear();
+            pagesList.clear();
+            newChunkGroup = true;
+            break;
+
+          default:
+            // the disk file is corrupted, using this file may be dangerous
+            logger.error("Unrecognized marker detected, this file may be corrupted");
+            return false;
+        }
+      }
+      tsFileIOWriter.endFile(schema);
+      return true;
+    } catch (IOException | PageException e2) {
+      logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+          + "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
+      return false;
+    } finally {
+      if (tsFileInput != null) {
+        tsFileInput.close();
+      }
+      if (tsFileIOWriter != null) {
+        tsFileIOWriter.close();
+      }
+    }
+  }
+
+  static ByteBuffer rewrite(ByteBuffer page, TSDataType tsDataType,
+      CompressionType compressionType, PageHeader pageHeader) {
+    switch (compressionType) {
+      case UNCOMPRESSED:
+        break;
+      case SNAPPY:
+        SnappyUnCompressor snappyUnCompressor = new SnappyUnCompressor();
+        page = ByteBuffer.wrap(snappyUnCompressor.uncompress(page.array()));
+        break;
+      default:
+        throw new CompressionTypeNotSupportedException(compressionType.toString());
+    }
+    ByteBuffer modifiedPage = ByteBuffer.allocate(page.capacity());
+
+    int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(page);
+    ByteBuffer timeBuffer = page.slice();
+    ByteBuffer valueBuffer = page.slice();
+
+    timeBuffer.limit(timeBufferLength);
+    valueBuffer.position(timeBufferLength);
+    valueBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+    ReadWriteForEncodingUtils.writeUnsignedVarInt(timeBufferLength, modifiedPage);
+    modifiedPage.put(timeBuffer);
+    modifiedPage.order(ByteOrder.BIG_ENDIAN);
+    switch (tsDataType) {
+      case BOOLEAN:
+        modifiedPage.put(valueBuffer);
+        break;
+      case INT32:
+        while (valueBuffer.remaining() > 0) {
+          modifiedPage.putInt(valueBuffer.getInt());
+        }
+        break;
+      case INT64:
+        while (valueBuffer.remaining() > 0) {
+          modifiedPage.putLong(valueBuffer.getLong());
+        }
+        break;
+      case FLOAT:
+        while (valueBuffer.remaining() > 0) {
+          modifiedPage.putFloat(valueBuffer.getFloat());
+        }
+        break;
+      case DOUBLE:
+        while (valueBuffer.remaining() > 0) {
+          modifiedPage.putDouble(valueBuffer.getDouble());
+        }
+        break;
+      case TEXT:
+        while (valueBuffer.remaining() > 0) {
+          int length = valueBuffer.getInt();
+          byte[] buf = new byte[length];
+          valueBuffer.get(buf, 0, buf.length);
+          modifiedPage.putInt(length);
+          modifiedPage.put(buf);
+        }
+        break;
+    }
+    switch (compressionType) {
+      case UNCOMPRESSED:
+        modifiedPage.flip();
+        break;
+      case SNAPPY:
+        pageHeader.setUncompressedSize(modifiedPage.array().length);
+        SnappyCompressor snappyCompressor = new SnappyCompressor();
+        try {
+          modifiedPage = ByteBuffer.wrap(snappyCompressor.compress(modifiedPage.array()));
+          pageHeader.setCompressedSize(modifiedPage.array().length);
+        } catch (IOException e) {
+          logger.error("failed to compress page as snappy", e);
+        }
+        break;
+      default:
+        throw new CompressionTypeNotSupportedException(compressionType.toString());
+    }
+    return modifiedPage;
+  }
+}
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/UpgradeTool.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/UpgradeTool.java
new file mode 100644
index 0000000..076556c
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/UpgradeTool.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.tool.upgrade;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+public class UpgradeTool {
+
+  /**
+   * upgrade all tsfiles in the specific dir
+   *
+   * @param dir tsfile dir which needs to be upgraded
+   * @param upgradeDir tsfile dir after upgraded
+   * @param threadNum num of threads that perform offline upgrade tasks
+   */
+  public static void upgradeTsfiles(String dir, String upgradeDir, int threadNum)
+      throws IOException {
+    //Traverse to find all tsfiles
+    File file = FSFactoryProducer.getFSFactory().getFile(dir);
+    Queue<File> tmp = new LinkedList<>();
+    tmp.add(file);
+    List<String> tsfiles = new ArrayList<>();
+    if (file.exists()) {
+      while (!tmp.isEmpty()) {
+        File tmp_file = tmp.poll();
+        File[] files = tmp_file.listFiles();
+        for (File file2 : files) {
+          if (file2.isDirectory()) {
+            tmp.add(file2);
+          } else {
+            if (file2.getName().endsWith(".tsfile")) {
+              tsfiles.add(file2.getAbsolutePath());
+            }
+            // copy all the resource files to the upgradeDir
+            if (file2.getName().endsWith(".resource")) {
+              File newFileName = FSFactoryProducer.getFSFactory()
+                  .getFile(file2.getAbsoluteFile().toString().replace(dir, upgradeDir));
+              if (!newFileName.getParentFile().exists()) {
+                newFileName.getParentFile().mkdirs();
+              }
+              newFileName.createNewFile();
+              FileUtils.copyFile(file2, newFileName);
+            }
+          }
+        }
+      }
+    }
+    // begin upgrade tsfiles
+    System.out.println(String.format(
+        "begin upgrade the data dir:%s, the total num of the tsfiles that need to be upgraded:%s",
+        dir, tsfiles.size()));
+    AtomicInteger dirUpgradeFileNum = new AtomicInteger(tsfiles.size());
+    ExecutorService offlineUpgradeThreadPool = Executors.newFixedThreadPool(threadNum);
+    //for every tsfile,do upgrade operation
+    for (String tsfile : tsfiles) {
+      offlineUpgradeThreadPool.submit(() -> {
+        try {
+          upgradeOneTsfile(tsfile, tsfile.replace(dir, upgradeDir));
+          System.out.println(
+              String.format("upgrade file success, file name:%s, remaining file num:%s", tsfile,
+                  dirUpgradeFileNum.decrementAndGet()));
+        } catch (Exception e) {
+          System.out.println(String.format("meet error when upgrade file:%s", tsfile));
+          e.printStackTrace();
+        }
+      });
+    }
+    offlineUpgradeThreadPool.shutdown();
+  }
+
+  /**
+   * upgrade a single tsfile
+   *
+   * @param tsfileName old version tsfile's absolute path
+   * @param updateFileName new version tsfile's absolute path
+   */
+  public static void upgradeOneTsfile(String tsfileName, String updateFileName) throws IOException {
+    TsfileUpgradeToolV0_8_0 updater = new TsfileUpgradeToolV0_8_0(tsfileName);
+    updater.upgradeFile(updateFileName);
+  }
+
+}
\ No newline at end of file