You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2020/03/31 02:53:15 UTC
[incubator-iotdb] 01/01: add the old upgrade tool back
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch upgrade_tool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 5b9fb3f47ba69ecd5cbd13bcb944518db18b62ee
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Mar 31 10:52:18 2020 +0800
add the old upgrade tool back
---
.../iotdb/db/tools/upgrade/OfflineUpgradeTool.java | 68 +++
.../tool/upgrade/TsfileUpgradeToolV0_8_0.java | 543 +++++++++++++++++++++
.../iotdb/tsfile/tool/upgrade/UpgradeTool.java | 108 ++++
3 files changed, 719 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/OfflineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/OfflineUpgradeTool.java
new file mode 100644
index 0000000..c63dfd6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/OfflineUpgradeTool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.tools.upgrade;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.tool.upgrade.UpgradeTool;
+
+public class OfflineUpgradeTool {
+
+ private static List<String> oldVersionTsfileDirs = new ArrayList<>();
+ private static List<String> newVersionTsfileDirs = new ArrayList<>();
+ private static int upgradeThreadNum;
+
+ private static void loadProps(String configPath) {
+ InputStream inputStream = null;
+ try {
+ inputStream = new FileInputStream(FSFactoryProducer.getFSFactory().getFile(configPath));
+ } catch (FileNotFoundException e) {
+ System.out.println(String.format("Fail to find config file:%s", configPath));
+ e.printStackTrace();
+ System.exit(1);
+ }
+ Properties properties = new Properties();
+ try {
+ properties.load(inputStream);
+ String oldVersionTsfileDirString = properties.getProperty("old_version_data_dirs");
+ Collections.addAll(oldVersionTsfileDirs, oldVersionTsfileDirString.split(","));
+ String newVersionTsfileDirString = properties.getProperty("new_version_data_dirs");
+ Collections.addAll(newVersionTsfileDirs, newVersionTsfileDirString.split(","));
+ upgradeThreadNum = Integer.parseInt(properties.getProperty("upgrade_thread_num"));
+ } catch (IOException e) {
+ System.out.println("Cannot load config file ");
+ e.printStackTrace();
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ loadProps(args[0]);
+ for (int i = 0; i < oldVersionTsfileDirs.size(); i++) {
+ UpgradeTool.upgradeTsfiles(oldVersionTsfileDirs.get(i), newVersionTsfileDirs.get(i),
+ upgradeThreadNum);
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/TsfileUpgradeToolV0_8_0.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/TsfileUpgradeToolV0_8_0.java
new file mode 100644
index 0000000..328dc3a
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/TsfileUpgradeToolV0_8_0.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.tool.upgrade;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.compress.ICompressor.SnappyCompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor.SnappyUnCompressor;
+import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.footer.ChunkGroupFooter;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.reader.DefaultTsFileInput;
+import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TsfileUpgradeToolV0_8_0 implements AutoCloseable {
+
+ private static final Logger logger = LoggerFactory.getLogger(TsfileUpgradeToolV0_8_0.class);
+
+ private TsFileInput tsFileInput;
+ private long fileMetadataPos;
+ private int fileMetadataSize;
+ private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+ protected String file;
+
+ /**
+ * Create a file reader of the given file. The reader will read the tail of the file to get the
+ * file metadata size.Then the reader will skip the first TSFileConfig.OLD_MAGIC_STRING.length()
+ * bytes of the file for preparing reading real data.
+ *
+ * @param file the data file
+ * @throws IOException If some I/O error occurs
+ */
+ public TsfileUpgradeToolV0_8_0(String file) throws IOException {
+ this(file, true);
+ }
+
+ /**
+ * construct function for TsfileUpgradeToolV0_8_0.
+ *
+ * @param file -given file name
+ * @param loadMetadataSize -load meta data size
+ */
+ public TsfileUpgradeToolV0_8_0(String file, boolean loadMetadataSize) throws IOException {
+ this.file = file;
+ final Path path = Paths.get(file);
+ tsFileInput = new DefaultTsFileInput(path);
+ try {
+ if (loadMetadataSize) {
+ loadMetadataSize(false);
+ }
+ } catch (Throwable e) {
+ tsFileInput.close();
+ throw e;
+ }
+ }
+
+ /**
+ * @param sealedWithNewMagic true when an old version tsfile sealed with new version MAGIC_STRING
+ */
+ public void loadMetadataSize(boolean sealedWithNewMagic) throws IOException {
+ ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
+ if (sealedWithNewMagic) {
+ tsFileInput.read(metadataSize,
+ tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
+ metadataSize.flip();
+ // read file metadata size and position
+ fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+ fileMetadataPos =
+ tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
+ - fileMetadataSize;
+ } else {
+ tsFileInput.read(metadataSize,
+ tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.length() - Integer.BYTES);
+ metadataSize.flip();
+ // read file metadata size and position
+ fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
+ fileMetadataPos = tsFileInput.size() - TSFileConfig.OLD_MAGIC_STRING.length() - Integer.BYTES
+ - fileMetadataSize;
+ }
+ // skip the magic header
+ position(TSFileConfig.OLD_MAGIC_STRING.length());
+ }
+
+ public String readTailMagic() throws IOException {
+ long totalSize = tsFileInput.size();
+
+ ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.OLD_MAGIC_STRING.length());
+ tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.OLD_MAGIC_STRING.length());
+ magicStringBytes.flip();
+ return new String(magicStringBytes.array());
+ }
+
+ /**
+ * whether the file is a complete TsFile: only if the head magic and tail magic string exists.
+ */
+ public boolean isComplete() throws IOException {
+ return tsFileInput.size() >= TSFileConfig.OLD_MAGIC_STRING.length() * 2 && readTailMagic()
+ .equals(readHeadMagic());
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ */
+ public String readHeadMagic() throws IOException {
+ return readHeadMagic(false);
+ }
+
+ /**
+ * @param movePosition whether move the position of the file reader after reading the magic header
+ * to the end of the magic head string.
+ */
+ public String readHeadMagic(boolean movePosition) throws IOException {
+ ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.OLD_MAGIC_STRING.length());
+ if (movePosition) {
+ tsFileInput.position(0);
+ tsFileInput.read(magicStringBytes);
+ } else {
+ tsFileInput.read(magicStringBytes, 0);
+ }
+ magicStringBytes.flip();
+ return new String(magicStringBytes.array());
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ */
+ public TsFileMetaData readFileMetadata() throws IOException {
+ ByteBuffer buffer = readData(fileMetadataPos, fileMetadataSize);
+ TsFileMetaData fileMetaData = new TsFileMetaData();
+
+ int size = ReadWriteIOUtils.readInt(buffer);
+ if (size > 0) {
+ Map<String, TsDeviceMetadataIndex> deviceMap = new HashMap<>();
+ String key;
+ TsDeviceMetadataIndex value;
+ for (int i = 0; i < size; i++) {
+ key = ReadWriteIOUtils.readString(buffer);
+ value = TsDeviceMetadataIndex.deserializeFrom(buffer);
+ deviceMap.put(key, value);
+ }
+ fileMetaData.setDeviceIndexMap(deviceMap);
+ }
+
+ size = ReadWriteIOUtils.readInt(buffer);
+ if (size > 0) {
+ fileMetaData.setMeasurementSchema(new HashMap<>());
+ String key;
+ MeasurementSchema value;
+ for (int i = 0; i < size; i++) {
+ key = ReadWriteIOUtils.readString(buffer);
+ value = MeasurementSchema.deserializeFrom(buffer);
+ fileMetaData.getMeasurementSchema().put(key, value);
+ }
+ }
+ // skip the current version of file metadata
+ ReadWriteIOUtils.readInt(buffer);
+
+ if (ReadWriteIOUtils.readIsNull(buffer)) {
+ fileMetaData.setCreatedBy(ReadWriteIOUtils.readString(buffer));
+ }
+
+ return fileMetaData;
+ }
+
+ /**
+ * this function does not modify the position of the file reader.
+ */
+ public TsDeviceMetadata readTsDeviceMetaData(TsDeviceMetadataIndex index) throws IOException {
+ return TsDeviceMetadata.deserializeFrom(readData(index.getOffset(), index.getLen()));
+ }
+
+ /**
+ * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
+ * This method is not threadsafe.
+ *
+ * @return a CHUNK_GROUP_FOOTER
+ * @throws IOException io error
+ */
+ public ChunkGroupFooter readChunkGroupFooter() throws IOException {
+ return ChunkGroupFooter.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+ }
+
+ /**
+ * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br> This
+ * method is not threadsafe.
+ *
+ * @return a CHUNK_HEADER
+ * @throws IOException io error
+ */
+ public ChunkHeader readChunkHeader() throws IOException {
+ return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
+ }
+
+ /**
+ * not thread safe.
+ *
+ * @param type given tsfile data type
+ */
+ public PageHeader readPageHeader(TSDataType type) throws IOException {
+ return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type);
+ }
+
+
+ public long position() throws IOException {
+ return tsFileInput.position();
+ }
+
+ public void position(long offset) throws IOException {
+ tsFileInput.position(offset);
+ }
+
+ /**
+ * read one byte from the input. <br> this method is not thread safe
+ */
+ public byte readMarker() throws IOException {
+ markerBuffer.clear();
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
+ throw new IOException("reach the end of the file.");
+ }
+ markerBuffer.flip();
+ return markerBuffer.get();
+ }
+
+ public void close() throws IOException {
+ this.tsFileInput.close();
+ }
+
+ public String getFileName() {
+ return this.file;
+ }
+
+ /**
+ * read data from tsFileInput, from the current position (if position = -1), or the given
+ * position. <br> if position = -1, the tsFileInput's position will be changed to the current
+ * position + real data size that been read. Other wise, the tsFileInput's position is not
+ * changed.
+ *
+ * @param position the start position of data in the tsFileInput, or the current position if
+ * position = -1
+ * @param size the size of data that want to read
+ * @return data that been read.
+ */
+ private ByteBuffer readData(long position, int size) throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ if (position == -1) {
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) {
+ throw new IOException("reach the end of the data");
+ }
+ } else {
+ if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size) != size) {
+ throw new IOException("reach the end of the data");
+ }
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ /**
+ * upgrade file and return the boolean value whether upgrade task completes
+ */
+ public boolean upgradeFile(String updateFileName) throws IOException {
+ File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
+ long fileSize;
+ if (!checkFile.exists()) {
+ logger.error("the file to be updated does not exist, file path: {}", checkFile.getPath());
+ return false;
+ } else {
+ fileSize = checkFile.length();
+ }
+ File upgradeFile = FSFactoryProducer.getFSFactory().getFile(updateFileName);
+ if (!upgradeFile.getParentFile().exists()) {
+ upgradeFile.getParentFile().mkdirs();
+ }
+ upgradeFile.createNewFile();
+ TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(upgradeFile);
+
+ List<ChunkHeader> chunkHeaders = new ArrayList<>();
+ List<List<PageHeader>> pageHeadersList = new ArrayList<>();
+ List<List<ByteBuffer>> pagesList = new ArrayList<>();
+ Schema schema = null;
+
+ String magic = readHeadMagic(true);
+ if (!magic.equals(TSFileConfig.OLD_MAGIC_STRING)) {
+ logger.error("the file's MAGIC STRING is incorrect, file path: {}", checkFile.getPath());
+ return false;
+ }
+
+ if (fileSize == TSFileConfig.OLD_MAGIC_STRING.length()) {
+ logger.error("the file only contains magic string, file path: {}", checkFile.getPath());
+ return false;
+ } else if (readTailMagic().equals(TSFileConfig.OLD_MAGIC_STRING)) {
+ loadMetadataSize(false);
+ TsFileMetaData tsFileMetaData = readFileMetadata();
+ schema = new Schema(tsFileMetaData.getMeasurementSchema());
+ } else {
+ loadMetadataSize(true);
+ TsFileMetaData tsFileMetaData = readFileMetadata();
+ schema = new Schema(tsFileMetaData.getMeasurementSchema());
+ }
+
+ ChunkMetaData currentChunkMetaData;
+ List<ChunkMetaData> chunkMetaDataList = null;
+ long startOffsetOfChunkGroup = 0;
+ boolean newChunkGroup = true;
+ long versionOfChunkGroup = 0;
+ List<ChunkGroupMetaData> newMetaData = new ArrayList<>();
+ List<Statistics<?>> chunkStatisticsList = new ArrayList<>();
+
+ boolean goon = true;
+ byte marker;
+ try {
+ while (goon && (marker = this.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ // this is the first chunk of a new ChunkGroup.
+ if (newChunkGroup) {
+ newChunkGroup = false;
+ chunkMetaDataList = new ArrayList<>();
+ startOffsetOfChunkGroup = this.position() - 1;
+ }
+
+ long fileOffsetOfChunk = this.position() - 1;
+ ChunkHeader header = this.readChunkHeader();
+ chunkHeaders.add(header);
+ List<PageHeader> pageHeaders = new ArrayList<>();
+ List<ByteBuffer> pages = new ArrayList<>();
+ TSDataType dataType = header.getDataType();
+ Statistics<?> chunkStatistics = Statistics.getStatsByType(dataType);
+ chunkStatisticsList.add(chunkStatistics);
+ if (header.getNumOfPages() > 0) {
+ PageHeader pageHeader = this.readPageHeader(header.getDataType());
+ pageHeaders.add(pageHeader);
+ chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+ pages.add(readData(-1, pageHeader.getCompressedSize()));
+ }
+ for (int j = 1; j < header.getNumOfPages() - 1; j++) {
+ PageHeader pageHeader = this.readPageHeader(header.getDataType());
+ pageHeaders.add(pageHeader);
+ chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+ pages.add(readData(-1, pageHeader.getCompressedSize()));
+ }
+ if (header.getNumOfPages() > 1) {
+ PageHeader pageHeader = this.readPageHeader(header.getDataType());
+ pageHeaders.add(pageHeader);
+ chunkStatistics.mergeStatistics(pageHeader.getStatistics());
+ pages.add(readData(-1, pageHeader.getCompressedSize()));
+ }
+
+ currentChunkMetaData = new ChunkMetaData(header.getMeasurementID(), dataType,
+ fileOffsetOfChunk, chunkStatistics);
+ chunkMetaDataList.add(currentChunkMetaData);
+ pageHeadersList.add(pageHeaders);
+ pagesList.add(pages);
+ break;
+ case MetaMarker.CHUNK_GROUP_FOOTER:
+ ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
+ String deviceID = chunkGroupFooter.getDeviceID();
+ long endOffsetOfChunkGroup = this.position();
+ ChunkGroupMetaData currentChunkGroup = new ChunkGroupMetaData(deviceID,
+ chunkMetaDataList,
+ startOffsetOfChunkGroup);
+ currentChunkGroup.setEndOffsetOfChunkGroup(endOffsetOfChunkGroup);
+ currentChunkGroup.setVersion(versionOfChunkGroup++);
+ newMetaData.add(currentChunkGroup);
+ tsFileIOWriter.startChunkGroup(deviceID);
+ for (int i = 0; i < chunkHeaders.size(); i++) {
+ TSDataType tsDataType = chunkHeaders.get(i).getDataType();
+ TSEncoding encodingType = chunkHeaders.get(i).getEncodingType();
+ CompressionType compressionType = chunkHeaders.get(i).getCompressionType();
+ ChunkHeader chunkHeader = chunkHeaders.get(i);
+ List<PageHeader> pageHeaderList = pageHeadersList.get(i);
+ List<ByteBuffer> pageList = pagesList.get(i);
+
+ if (schema.getMeasurementSchema(chunkHeader.getMeasurementID()) != null) {
+ ChunkWriterImpl chunkWriter = new ChunkWriterImpl(
+ schema.getMeasurementSchema(chunkHeader.getMeasurementID()));
+ for (int j = 0; j < pageHeaderList.size(); j++) {
+ if (encodingType.equals(TSEncoding.PLAIN)) {
+ pageList.set(j, rewrite(pageList.get(j), tsDataType, compressionType,
+ pageHeaderList.get(j)));
+ }
+ chunkWriter
+ .writePageHeaderAndDataIntoBuff(pageList.get(j), pageHeaderList.get(j));
+ }
+ chunkWriter
+ .writeAllPagesOfChunkToTsFile(tsFileIOWriter, chunkStatisticsList.get(i));
+ }
+ }
+ tsFileIOWriter.endChunkGroup(currentChunkGroup.getVersion());
+ chunkStatisticsList.clear();
+ chunkHeaders.clear();
+ pageHeadersList.clear();
+ pagesList.clear();
+ newChunkGroup = true;
+ break;
+
+ default:
+ // the disk file is corrupted, using this file may be dangerous
+ logger.error("Unrecognized marker detected, this file may be corrupted");
+ return false;
+ }
+ }
+ tsFileIOWriter.endFile(schema);
+ return true;
+ } catch (IOException | PageException e2) {
+ logger.info("TsFile upgrade process cannot proceed at position {} after {} chunk groups "
+ + "recovered, because : {}", this.position(), newMetaData.size(), e2.getMessage());
+ return false;
+ } finally {
+ if (tsFileInput != null) {
+ tsFileInput.close();
+ }
+ if (tsFileIOWriter != null) {
+ tsFileIOWriter.close();
+ }
+ }
+ }
+
+ static ByteBuffer rewrite(ByteBuffer page, TSDataType tsDataType,
+ CompressionType compressionType, PageHeader pageHeader) {
+ switch (compressionType) {
+ case UNCOMPRESSED:
+ break;
+ case SNAPPY:
+ SnappyUnCompressor snappyUnCompressor = new SnappyUnCompressor();
+ page = ByteBuffer.wrap(snappyUnCompressor.uncompress(page.array()));
+ break;
+ default:
+ throw new CompressionTypeNotSupportedException(compressionType.toString());
+ }
+ ByteBuffer modifiedPage = ByteBuffer.allocate(page.capacity());
+
+ int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(page);
+ ByteBuffer timeBuffer = page.slice();
+ ByteBuffer valueBuffer = page.slice();
+
+ timeBuffer.limit(timeBufferLength);
+ valueBuffer.position(timeBufferLength);
+ valueBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(timeBufferLength, modifiedPage);
+ modifiedPage.put(timeBuffer);
+ modifiedPage.order(ByteOrder.BIG_ENDIAN);
+ switch (tsDataType) {
+ case BOOLEAN:
+ modifiedPage.put(valueBuffer);
+ break;
+ case INT32:
+ while (valueBuffer.remaining() > 0) {
+ modifiedPage.putInt(valueBuffer.getInt());
+ }
+ break;
+ case INT64:
+ while (valueBuffer.remaining() > 0) {
+ modifiedPage.putLong(valueBuffer.getLong());
+ }
+ break;
+ case FLOAT:
+ while (valueBuffer.remaining() > 0) {
+ modifiedPage.putFloat(valueBuffer.getFloat());
+ }
+ break;
+ case DOUBLE:
+ while (valueBuffer.remaining() > 0) {
+ modifiedPage.putDouble(valueBuffer.getDouble());
+ }
+ break;
+ case TEXT:
+ while (valueBuffer.remaining() > 0) {
+ int length = valueBuffer.getInt();
+ byte[] buf = new byte[length];
+ valueBuffer.get(buf, 0, buf.length);
+ modifiedPage.putInt(length);
+ modifiedPage.put(buf);
+ }
+ break;
+ }
+ switch (compressionType) {
+ case UNCOMPRESSED:
+ modifiedPage.flip();
+ break;
+ case SNAPPY:
+ pageHeader.setUncompressedSize(modifiedPage.array().length);
+ SnappyCompressor snappyCompressor = new SnappyCompressor();
+ try {
+ modifiedPage = ByteBuffer.wrap(snappyCompressor.compress(modifiedPage.array()));
+ pageHeader.setCompressedSize(modifiedPage.array().length);
+ } catch (IOException e) {
+ logger.error("failed to compress page as snappy", e);
+ }
+ break;
+ default:
+ throw new CompressionTypeNotSupportedException(compressionType.toString());
+ }
+ return modifiedPage;
+ }
+}
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/UpgradeTool.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/UpgradeTool.java
new file mode 100644
index 0000000..076556c
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/tool/upgrade/UpgradeTool.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.tool.upgrade;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+public class UpgradeTool {
+
+ /**
+ * upgrade all tsfiles in the specific dir
+ *
+ * @param dir tsfile dir which needs to be upgraded
+ * @param upgradeDir tsfile dir after upgraded
+ * @param threadNum num of threads that perform offline upgrade tasks
+ */
+ public static void upgradeTsfiles(String dir, String upgradeDir, int threadNum)
+ throws IOException {
+ //Traverse to find all tsfiles
+ File file = FSFactoryProducer.getFSFactory().getFile(dir);
+ Queue<File> tmp = new LinkedList<>();
+ tmp.add(file);
+ List<String> tsfiles = new ArrayList<>();
+ if (file.exists()) {
+ while (!tmp.isEmpty()) {
+ File tmp_file = tmp.poll();
+ File[] files = tmp_file.listFiles();
+ for (File file2 : files) {
+ if (file2.isDirectory()) {
+ tmp.add(file2);
+ } else {
+ if (file2.getName().endsWith(".tsfile")) {
+ tsfiles.add(file2.getAbsolutePath());
+ }
+ // copy all the resource files to the upgradeDir
+ if (file2.getName().endsWith(".resource")) {
+ File newFileName = FSFactoryProducer.getFSFactory()
+ .getFile(file2.getAbsoluteFile().toString().replace(dir, upgradeDir));
+ if (!newFileName.getParentFile().exists()) {
+ newFileName.getParentFile().mkdirs();
+ }
+ newFileName.createNewFile();
+ FileUtils.copyFile(file2, newFileName);
+ }
+ }
+ }
+ }
+ }
+ // begin upgrade tsfiles
+ System.out.println(String.format(
+ "begin upgrade the data dir:%s, the total num of the tsfiles that need to be upgraded:%s",
+ dir, tsfiles.size()));
+ AtomicInteger dirUpgradeFileNum = new AtomicInteger(tsfiles.size());
+ ExecutorService offlineUpgradeThreadPool = Executors.newFixedThreadPool(threadNum);
+ //for every tsfile,do upgrade operation
+ for (String tsfile : tsfiles) {
+ offlineUpgradeThreadPool.submit(() -> {
+ try {
+ upgradeOneTsfile(tsfile, tsfile.replace(dir, upgradeDir));
+ System.out.println(
+ String.format("upgrade file success, file name:%s, remaining file num:%s", tsfile,
+ dirUpgradeFileNum.decrementAndGet()));
+ } catch (Exception e) {
+ System.out.println(String.format("meet error when upgrade file:%s", tsfile));
+ e.printStackTrace();
+ }
+ });
+ }
+ offlineUpgradeThreadPool.shutdown();
+ }
+
+ /**
+ * upgrade a single tsfile
+ *
+ * @param tsfileName old version tsfile's absolute path
+ * @param updateFileName new version tsfile's absolute path
+ */
+ public static void upgradeOneTsfile(String tsfileName, String updateFileName) throws IOException {
+ TsfileUpgradeToolV0_8_0 updater = new TsfileUpgradeToolV0_8_0(tsfileName);
+ updater.upgradeFile(updateFileName);
+ }
+
+}
\ No newline at end of file