You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/03 03:34:06 UTC
[iotdb] 01/01: Optimize the Upgrade Tool rewrite logic
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch upgradeMem
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c5b2ab090c76e588d3f945256357c50735e82543
Author: HTHou <hh...@outlook.com>
AuthorDate: Tue Aug 3 11:29:56 2021 +0800
Optimize the Upgrade Tool rewrite logic
---
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 143 +++++++++++----------
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 32 ++---
2 files changed, 89 insertions(+), 86 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index e1e4bc7..54e1560 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -1,20 +1,16 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package org.apache.iotdb.db.tools;
@@ -158,11 +154,9 @@ public class TsFileRewriteTool implements AutoCloseable {
int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
reader.position(headerLength);
// start to scan chunks and chunkGroups
- List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
- List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
- List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
byte marker;
- List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+
+ Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
String lastChunkGroupDeviceId = null;
try {
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
@@ -176,7 +170,6 @@ public class TsFileRewriteTool implements AutoCloseable {
header.getDataType(),
header.getEncodingType(),
header.getCompressionType());
- measurementSchemaList.add(measurementSchema);
TSDataType dataType = header.getDataType();
TSEncoding encoding = header.getEncodingType();
List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -194,24 +187,19 @@ public class TsFileRewriteTool implements AutoCloseable {
dataInChunk.add(pageData);
dataSize -= pageHeader.getSerializedPageSize();
}
- pageHeadersInChunkGroup.add(pageHeadersInChunk);
- pageDataInChunkGroup.add(dataInChunk);
- needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
+ reEncodeChunk(
+ measurementSchema,
+ pageHeadersInChunk,
+ dataInChunk,
+ needToDecodeInfo,
+ chunkWritersInChunkGroup);
break;
case MetaMarker.CHUNK_GROUP_HEADER:
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
String deviceId = chunkGroupHeader.getDeviceID();
- if (lastChunkGroupDeviceId != null && !measurementSchemaList.isEmpty()) {
- rewrite(
- lastChunkGroupDeviceId,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
+ if (lastChunkGroupDeviceId != null && !chunkWritersInChunkGroup.isEmpty()) {
+ reWriteChunkGroupToFile(lastChunkGroupDeviceId, chunkWritersInChunkGroup);
+ chunkWritersInChunkGroup.clear();
}
lastChunkGroupDeviceId = deviceId;
break;
@@ -239,17 +227,9 @@ public class TsFileRewriteTool implements AutoCloseable {
}
}
- if (!measurementSchemaList.isEmpty()) {
- rewrite(
- lastChunkGroupDeviceId,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
+ if (!chunkWritersInChunkGroup.isEmpty()) {
+ reWriteChunkGroupToFile(lastChunkGroupDeviceId, chunkWritersInChunkGroup);
+ chunkWritersInChunkGroup.clear();
}
// close upgraded tsFiles and generate resources for them
for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
@@ -305,30 +285,61 @@ public class TsFileRewriteTool implements AutoCloseable {
* this case, we have to decode the data to points, and then rewrite the data points to different
* chunkWriters, finally write chunks to their own upgraded TsFiles.
*/
- protected void rewrite(
- String deviceId,
- List<MeasurementSchema> schemas,
- List<List<PageHeader>> pageHeadersInChunkGroup,
- List<List<ByteBuffer>> dataInChunkGroup,
- List<List<Boolean>> needToDecodeInfoInChunkGroup)
+ // protected void rewrite(String deviceId, List<MeasurementSchema> schemas,
+ // List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup,
+ // List<List<Boolean>> needToDecodeInfoInChunkGroup) throws IOException, PageException {
+ // Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new
+ // HashMap<>();
+ // for (int i = 0; i < schemas.size(); i++) {
+ // MeasurementSchema schema = schemas.get(i);
+ // List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
+ // List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
+ // List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
+ // valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+ // for (int j = 0; j < pageDataInChunk.size(); j++) {
+ // if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
+ // decodeAndWritePage(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
+ // } else {
+ // writePage(schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j),
+ // chunkWritersInChunkGroup);
+ // }
+ // }
+ // }
+ //
+ // for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup
+ // .entrySet()) {
+ // long partitionId = entry.getKey();
+ // TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
+ // tsFileIOWriter.startChunkGroup(deviceId);
+ // // write chunks to their own upgraded tsFiles
+ // for (IChunkWriter chunkWriter : entry.getValue().values()) {
+ // chunkWriter.writeToFileWriter(tsFileIOWriter);
+ // }
+ // tsFileIOWriter.endChunkGroup();
+ // }
+ // }
+
+ protected void reEncodeChunk(
+ MeasurementSchema schema,
+ List<PageHeader> pageHeadersInChunk,
+ List<ByteBuffer> pageDataInChunk,
+ List<Boolean> needToDecodeInfoInChunk,
+ Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
throws IOException, PageException {
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
- for (int i = 0; i < schemas.size(); i++) {
- MeasurementSchema schema = schemas.get(i);
- List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
- List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
- List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
- valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
- for (int j = 0; j < pageDataInChunk.size(); j++) {
- if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
- decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
- } else {
- writePageInToFile(
- schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
- }
+ valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
+ for (int i = 0; i < pageDataInChunk.size(); i++) {
+ if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) {
+ decodeAndWritePage(schema, pageDataInChunk.get(i), chunkWritersInChunkGroup);
+ } else {
+ writePage(
+ schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), chunkWritersInChunkGroup);
}
}
+ }
+ protected void reWriteChunkGroupToFile(
+ String deviceId, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
+ throws IOException {
for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry :
chunkWritersInChunkGroup.entrySet()) {
long partitionId = entry.getKey();
@@ -380,7 +391,7 @@ public class TsFileRewriteTool implements AutoCloseable {
});
}
- protected void writePageInToFile(
+ protected void writePage(
MeasurementSchema schema,
PageHeader pageHeader,
ByteBuffer pageData,
@@ -396,7 +407,7 @@ public class TsFileRewriteTool implements AutoCloseable {
chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
- protected void decodeAndWritePageInToFiles(
+ protected void decodeAndWritePage(
MeasurementSchema schema,
ByteBuffer pageData,
Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 90646b5..1c48488 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -96,11 +97,8 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
TSFileConfig.MAGIC_STRING.getBytes().length
+ TSFileConfig.VERSION_NUMBER_V2.getBytes().length;
reader.position(headerLength);
- List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
- List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
- List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
byte marker;
- List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+ Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
try {
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
@@ -112,7 +110,6 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
header.getDataType(),
header.getEncodingType(),
header.getCompressionType());
- measurementSchemaList.add(measurementSchema);
TSDataType dataType = header.getDataType();
TSEncoding encoding = header.getEncodingType();
List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -141,25 +138,20 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
// page data bytes
+ pageHeader.getCompressedSize());
}
- pageHeadersInChunkGroup.add(pageHeadersInChunk);
- pageDataInChunkGroup.add(dataInChunk);
- needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
+ reEncodeChunk(
+ measurementSchema,
+ pageHeadersInChunk,
+ dataInChunk,
+ needToDecodeInfo,
+ chunkWritersInChunkGroup);
break;
case MetaMarker.CHUNK_GROUP_HEADER:
// this is the footer of a ChunkGroup in TsFileV2.
ChunkGroupHeader chunkGroupFooter =
((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
String deviceID = chunkGroupFooter.getDeviceID();
- rewrite(
- deviceID,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
+ reWriteChunkGroupToFile(deviceID, chunkWritersInChunkGroup);
+ chunkWritersInChunkGroup.clear();
break;
case MetaMarker.VERSION:
long version = ((TsFileSequenceReaderForV2) reader).readVersion();
@@ -216,7 +208,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
currentMod = null;
}
}
- } catch (IOException e2) {
+ } catch (Exception e2) {
throw new IOException(
"TsFile upgrade process cannot proceed at position "
+ reader.position()
@@ -252,7 +244,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool {
}
@Override
- protected void decodeAndWritePageInToFiles(
+ protected void decodeAndWritePage(
MeasurementSchema schema,
ByteBuffer pageData,
Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)