You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/10 00:29:45 UTC
[iotdb] branch master updated: [IOTDB-1546] Optimize the
Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost (#3701)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4884dc8 [IOTDB-1546] Optimize the Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost (#3701)
4884dc8 is described below
commit 4884dc87baaf3ca7126dd297defece7da8a6ce97
Author: Haonan <hh...@outlook.com>
AuthorDate: Tue Aug 10 08:29:18 2021 +0800
[IOTDB-1546] Optimize the Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost (#3701)
---
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 148 ++++++++------------
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 151 ++++++++++++---------
2 files changed, 150 insertions(+), 149 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index c52457b..60c42ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.tools;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -44,7 +45,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -159,15 +159,19 @@ public class TsFileRewriteTool implements AutoCloseable {
int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
reader.position(headerLength);
// start to scan chunks and chunkGroups
- List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
- List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
- List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
byte marker;
- List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
- String lastChunkGroupDeviceId = null;
+
+ String deviceId = null;
+ boolean firstChunkInChunkGroup = true;
try {
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ deviceId = chunkGroupHeader.getDeviceID();
+ firstChunkInChunkGroup = true;
+ endChunkGroup();
+ break;
case MetaMarker.CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
ChunkHeader header = reader.readChunkHeader(marker);
@@ -177,7 +181,6 @@ public class TsFileRewriteTool implements AutoCloseable {
header.getDataType(),
header.getEncodingType(),
header.getCompressionType());
- measurementSchemaList.add(measurementSchema);
TSDataType dataType = header.getDataType();
TSEncoding encoding = header.getEncodingType();
List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -195,26 +198,14 @@ public class TsFileRewriteTool implements AutoCloseable {
dataInChunk.add(pageData);
dataSize -= pageHeader.getSerializedPageSize();
}
- pageHeadersInChunkGroup.add(pageHeadersInChunk);
- pageDataInChunkGroup.add(dataInChunk);
- needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
- break;
- case MetaMarker.CHUNK_GROUP_HEADER:
- ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
- String deviceId = chunkGroupHeader.getDeviceID();
- if (lastChunkGroupDeviceId != null && !measurementSchemaList.isEmpty()) {
- rewrite(
- lastChunkGroupDeviceId,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
- }
- lastChunkGroupDeviceId = deviceId;
+ reWriteChunk(
+ deviceId,
+ firstChunkInChunkGroup,
+ measurementSchema,
+ pageHeadersInChunk,
+ dataInChunk,
+ needToDecodeInfo);
+ firstChunkInChunkGroup = false;
break;
case MetaMarker.OPERATION_INDEX_RANGE:
reader.readPlanIndex();
@@ -239,19 +230,7 @@ public class TsFileRewriteTool implements AutoCloseable {
MetaMarker.handleUnexpectedMarker(marker);
}
}
-
- if (!measurementSchemaList.isEmpty()) {
- rewrite(
- lastChunkGroupDeviceId,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
- }
+ endChunkGroup();
// close upgraded tsFiles and generate resources for them
for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
rewrittenResources.add(endFileAndGenerateResource(tsFileIOWriter));
@@ -302,44 +281,43 @@ public class TsFileRewriteTool implements AutoCloseable {
}
/**
- * This method is for rewriting the ChunkGroup which data is in the different time partitions. In
- * this case, we have to decode the data to points, and then rewrite the data points to different
+ * This method is for rewriting the Chunk which data is in the different time partitions. In this
+ * case, we have to decode the data to points, and then rewrite the data points to different
* chunkWriters, finally write chunks to their own upgraded TsFiles.
*/
- protected void rewrite(
+ protected void reWriteChunk(
String deviceId,
- List<IMeasurementSchema> schemas,
- List<List<PageHeader>> pageHeadersInChunkGroup,
- List<List<ByteBuffer>> dataInChunkGroup,
- List<List<Boolean>> needToDecodeInfoInChunkGroup)
+ boolean firstChunkInChunkGroup,
+ MeasurementSchema schema,
+ List<PageHeader> pageHeadersInChunk,
+ List<ByteBuffer> pageDataInChunk,
+ List<Boolean> needToDecodeInfoInChunk)
throws IOException, PageException {
- Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
- for (int i = 0; i < schemas.size(); i++) {
- IMeasurementSchema schema = schemas.get(i);
- List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
- List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
- List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
- valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
- for (int j = 0; j < pageDataInChunk.size(); j++) {
- if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
- decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
- } else {
- writePageInToFile(
- schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
- }
+ valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>();
+ for (int i = 0; i < pageDataInChunk.size(); i++) {
+ if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) {
+ decodeAndWritePage(schema, pageDataInChunk.get(i), partitionChunkWriterMap);
+ } else {
+ writePage(
+ schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap);
}
}
-
- for (Entry<Long, Map<IMeasurementSchema, ChunkWriterImpl>> entry :
- chunkWritersInChunkGroup.entrySet()) {
+ for (Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) {
long partitionId = entry.getKey();
TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
- tsFileIOWriter.startChunkGroup(deviceId);
- // write chunks to their own upgraded tsFiles
- for (IChunkWriter chunkWriter : entry.getValue().values()) {
- chunkWriter.writeToFileWriter(tsFileIOWriter);
+ if (firstChunkInChunkGroup) {
+ tsFileIOWriter.startChunkGroup(deviceId);
}
- tsFileIOWriter.endChunkGroup();
+ // write chunks to their own upgraded tsFiles
+ IChunkWriter chunkWriter = entry.getValue();
+ chunkWriter.writeToFileWriter(tsFileIOWriter);
+ }
+ }
+
+ protected void endChunkGroup() throws IOException {
+ for (TsFileIOWriter tsFileIoWriter : partitionWriterMap.values()) {
+ tsFileIoWriter.endChunkGroup();
}
}
@@ -381,46 +359,42 @@ public class TsFileRewriteTool implements AutoCloseable {
});
}
- protected void writePageInToFile(
- IMeasurementSchema schema,
+ protected void writePage(
+ MeasurementSchema schema,
PageHeader pageHeader,
ByteBuffer pageData,
- Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
throws PageException {
long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
- Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
- chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
- ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+ ChunkWriterImpl chunkWriter =
+ partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));
chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
- chunkWriters.put(schema, chunkWriter);
- chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
- protected void decodeAndWritePageInToFiles(
- IMeasurementSchema schema,
+ protected void decodeAndWritePage(
+ MeasurementSchema schema,
ByteBuffer pageData,
- Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
throws IOException {
valueDecoder.reset();
PageReader pageReader =
new PageReader(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null);
BatchData batchData = pageReader.getAllSatisfiedPageData();
- rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup);
+ rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
}
protected void rewritePageIntoFiles(
BatchData batchData,
- IMeasurementSchema schema,
- Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) {
+ MeasurementSchema schema,
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap) {
while (batchData.hasCurrent()) {
long time = batchData.currentTime();
Object value = batchData.currentValue();
long partitionId = StorageEngine.getTimePartition(time);
- Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
- chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
- ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
+ ChunkWriterImpl chunkWriter =
+ partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
switch (schema.getType()) {
case INT32:
@@ -446,8 +420,6 @@ public class TsFileRewriteTool implements AutoCloseable {
String.format("Data type %s is not supported.", schema.getType()));
}
batchData.next();
- chunkWriters.put(schema, chunkWriter);
- chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 8b04bba..0198844 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
import org.apache.iotdb.tsfile.v2.read.reader.page.PageReaderV2;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -93,70 +92,99 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
TSFileConfig.MAGIC_STRING.getBytes().length
+ TSFileConfig.VERSION_NUMBER_V2.getBytes().length;
reader.position(headerLength);
- List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
- List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
- List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
byte marker;
- List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+ long firstChunkPositionInChunkGroup = headerLength;
+ boolean firstChunkInChunkGroup = true;
+ String deviceId = null;
+ boolean skipReadingChunk = true;
try {
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
- ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader();
- IMeasurementSchema measurementSchema =
- new MeasurementSchema(
- header.getMeasurementID(),
- header.getDataType(),
- header.getEncodingType(),
- header.getCompressionType());
- measurementSchemaList.add(measurementSchema);
- TSDataType dataType = header.getDataType();
- TSEncoding encoding = header.getEncodingType();
- List<PageHeader> pageHeadersInChunk = new ArrayList<>();
- List<ByteBuffer> dataInChunk = new ArrayList<>();
- List<Boolean> needToDecodeInfo = new ArrayList<>();
- int dataSize = header.getDataSize();
- while (dataSize > 0) {
- // a new Page
- PageHeader pageHeader = ((TsFileSequenceReaderForV2) reader).readPageHeader(dataType);
- boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader);
- needToDecodeInfo.add(needToDecode);
- ByteBuffer pageData =
- !needToDecode
- ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader)
- : reader.readPage(pageHeader, header.getCompressionType());
- pageHeadersInChunk.add(pageHeader);
- dataInChunk.add(pageData);
- dataSize -=
- (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize
- // count, startTime, endTime bytes size in old statistics
- + 24
- // statistics bytes size
- // new boolean StatsSize is 8 bytes larger than old one
- + (pageHeader.getStatistics().getStatsSize()
- - (dataType == TSDataType.BOOLEAN ? 8 : 0))
- // page data bytes
- + pageHeader.getCompressedSize());
+ if (skipReadingChunk || deviceId == null) {
+ ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader();
+ int dataSize = header.getDataSize();
+ while (dataSize > 0) {
+ // a new Page
+ PageHeader pageHeader =
+ ((TsFileSequenceReaderForV2) reader).readPageHeader(header.getDataType());
+ ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader);
+ dataSize -=
+ (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize
+ // count, startTime, endTime bytes size in old statistics
+ + 24
+ // statistics bytes size
+ // new boolean StatsSize is 8 bytes larger than old one
+ + (pageHeader.getStatistics().getStatsSize()
+ - (header.getDataType() == TSDataType.BOOLEAN ? 8 : 0))
+ // page data bytes
+ + pageHeader.getCompressedSize());
+ }
+ } else {
+ ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader();
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType());
+ TSDataType dataType = header.getDataType();
+ TSEncoding encoding = header.getEncodingType();
+ List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+ List<ByteBuffer> dataInChunk = new ArrayList<>();
+ List<Boolean> needToDecodeInfo = new ArrayList<>();
+ int dataSize = header.getDataSize();
+ while (dataSize > 0) {
+ // a new Page
+ PageHeader pageHeader =
+ ((TsFileSequenceReaderForV2) reader).readPageHeader(dataType);
+ boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader);
+ needToDecodeInfo.add(needToDecode);
+ ByteBuffer pageData =
+ !needToDecode
+ ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader)
+ : reader.readPage(pageHeader, header.getCompressionType());
+ pageHeadersInChunk.add(pageHeader);
+ dataInChunk.add(pageData);
+ dataSize -=
+ (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize
+ // count, startTime, endTime bytes size in old statistics
+ + 24
+ // statistics bytes size
+ // new boolean StatsSize is 8 bytes larger than old one
+ + (pageHeader.getStatistics().getStatsSize()
+ - (dataType == TSDataType.BOOLEAN ? 8 : 0))
+ // page data bytes
+ + pageHeader.getCompressedSize());
+ }
+ reWriteChunk(
+ deviceId,
+ firstChunkInChunkGroup,
+ measurementSchema,
+ pageHeadersInChunk,
+ dataInChunk,
+ needToDecodeInfo);
+ if (firstChunkInChunkGroup) {
+ firstChunkInChunkGroup = false;
+ }
}
- pageHeadersInChunkGroup.add(pageHeadersInChunk);
- pageDataInChunkGroup.add(dataInChunk);
- needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
break;
case MetaMarker.CHUNK_GROUP_HEADER:
// this is the footer of a ChunkGroup in TsFileV2.
- ChunkGroupHeader chunkGroupFooter =
- ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
- String deviceID = chunkGroupFooter.getDeviceID();
- rewrite(
- deviceID,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
+ if (skipReadingChunk) {
+ skipReadingChunk = false;
+ ChunkGroupHeader chunkGroupFooter =
+ ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
+ deviceId = chunkGroupFooter.getDeviceID();
+ reader.position(firstChunkPositionInChunkGroup);
+ } else {
+ endChunkGroup();
+ skipReadingChunk = true;
+ ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
+ deviceId = null;
+ firstChunkPositionInChunkGroup = reader.position();
+ firstChunkInChunkGroup = true;
+ }
break;
case MetaMarker.VERSION:
long version = ((TsFileSequenceReaderForV2) reader).readVersion();
@@ -184,6 +212,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
tsFileIOWriter.writePlanIndices();
}
+ firstChunkPositionInChunkGroup = reader.position();
break;
default:
// the disk file is corrupted, using this file may be dangerous
@@ -213,7 +242,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
currentMod = null;
}
}
- } catch (IOException e2) {
+ } catch (Exception e2) {
throw new IOException(
"TsFile upgrade process cannot proceed at position "
+ reader.position()
@@ -249,16 +278,16 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
}
@Override
- protected void decodeAndWritePageInToFiles(
- IMeasurementSchema schema,
+ protected void decodeAndWritePage(
+ MeasurementSchema schema,
ByteBuffer pageData,
- Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
throws IOException {
valueDecoder.reset();
PageReaderV2 pageReader =
new PageReaderV2(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null);
BatchData batchData = pageReader.getAllSatisfiedPageData();
- rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup);
+ rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
}
/** check if the file to be upgraded has correct magic strings and version number */