You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/20 01:49:50 UTC
[iotdb] 01/01: refactor rewrite tool
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-4693
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6667567e442268924fb8c29595aec373df392bc4
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Oct 20 09:49:32 2022 +0800
refactor rewrite tool
---
.../java/org/apache/iotdb/RewriteTsFileTool.java | 416 +++++++++++++--------
1 file changed, 254 insertions(+), 162 deletions(-)
diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index 4b1d47976d..c2cc99d0e9 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -27,42 +27,38 @@ import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
+import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
-import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
-import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -79,12 +75,13 @@ import org.apache.commons.cli.ParseException;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -99,8 +96,7 @@ public class RewriteTsFileTool {
private static String password = "root";
private static String filePath = "";
private static String readMode = "s";
-
- private static Map<String, Set<MeasurementSchema>> device2Measurements;
+ private static boolean ignoreBrokenChunk = false;
public static void main(String[] args) {
Session session = null;
@@ -138,6 +134,7 @@ public class RewriteTsFileTool {
password = getArgOrDefault(commandLine, "pw", password);
filePath = getArgOrDefault(commandLine, "f", filePath);
readMode = getArgOrDefault(commandLine, "rm", readMode);
+ ignoreBrokenChunk = commandLine.hasOption("ignore-broken");
} catch (ParseException e) {
System.out.printf("Parse Args Error. %s%n", e.getMessage());
priHelp(options);
@@ -203,6 +200,15 @@ public class RewriteTsFileTool {
.required()
.build();
options.addOption(readModeOpt);
+
+ Option ignoreBrokenChunkOpt =
+ Option.builder("ignore-broken")
+ .argName("IgnoreBrokenChunks")
+ .hasArg()
+ .desc("Ignore the broken chunks in the tsfile")
+ .type(boolean.class)
+ .build();
+ options.addOption(ignoreBrokenChunkOpt);
return options;
}
@@ -231,27 +237,10 @@ public class RewriteTsFileTool {
*/
public static void writeToIoTDB(List<File> files, Session session) {
sortTsFiles(files);
- int size = files.size();
- List<File> unloadTsFiles = new ArrayList<>();
- System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", size);
+ System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", files.size());
System.out.println("Start Loading TsFiles...");
if (readMode.equals("s")) {
- for (int i = 0; i < size; i++) {
- File file = files.get(i);
- System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
- try {
- seqWriteTsFile(file.getPath(), session);
- } catch (Exception e) {
- System.out.println(
- "------------------------------Error Message------------------------------");
- e.printStackTrace();
- System.out.println(
- "------------------------------End Message------------------------------");
- unloadTsFiles.add(file);
- continue;
- }
- System.out.println("Done");
- }
+ writeTsFileSequentially(files, session);
} else {
try {
reverseWriteTsFile(files, session);
@@ -268,6 +257,27 @@ public class RewriteTsFileTool {
}
}
System.out.println("Finish Loading TsFiles");
+ }
+
+ private static void writeTsFileSequentially(List<File> files, Session session) {
+ int size = files.size();
+ List<File> unloadTsFiles = new ArrayList<>();
+ for (int i = 0; i < files.size(); i++) {
+ File file = files.get(i);
+ System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+ try {
+ seqWriteSingleTsFile(file.getPath(), session);
+ } catch (Exception e) {
+ System.out.println(
+ "------------------------------Error Message------------------------------");
+ e.printStackTrace();
+ System.out.println(
+ "------------------------------End Message------------------------------");
+ unloadTsFiles.add(file);
+ continue;
+ }
+ System.out.println("Done");
+ }
System.out.printf(
"Load %d TsFiles successfully, %d TsFiles not loaded.%n",
size - unloadTsFiles.size(), unloadTsFiles.size());
@@ -306,104 +316,31 @@ public class RewriteTsFileTool {
* @param filename the file path to be loaded
* @param session IoTDB session
*/
- public static void seqWriteTsFile(String filename, Session session)
+ public static void seqWriteSingleTsFile(String filename, Session session)
throws IOException, IllegalPathException, IoTDBConnectionException,
StatementExecutionException, NoMeasurementException {
- // parse modifications from .mods
- List<Modification> modifications = null;
- if (FSFactoryProducer.getFSFactory()
- .getFile(filename + ModificationFile.FILE_SUFFIX)
- .exists()) {
- modifications =
- (List<Modification>)
- new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
- }
-
- // read all device and their measurements
- parseDeviceFromTsFile(filename);
try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
- for (Map.Entry<String, Set<MeasurementSchema>> entry : device2Measurements.entrySet()) {
- // collect measurements for device
- boolean isAligned = false;
- String curDevice = entry.getKey();
- List<MeasurementSchema> measurementSchemas = new ArrayList<>();
- ArrayList<Path> paths = new ArrayList<>();
- for (MeasurementSchema measurementSchema : entry.getValue()) {
- if (!measurementSchema.getType().equals(TSDataType.VECTOR)) {
- measurementSchemas.add(measurementSchema);
- } else {
- isAligned = true;
- }
- }
- for (MeasurementSchema measurementSchema : measurementSchemas) {
- paths.add(new Path(curDevice, measurementSchema.getMeasurementId()));
- }
-
- // construct query to this tsfile
- List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
- IMetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(reader);
- IChunkLoader chunkLoader = new CachedChunkLoaderImpl(reader);
- for (Path path : paths) {
- List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
- modifyChunkMetadata(isAligned, path, chunkMetadataList, modifications);
- AbstractFileSeriesReader seriesReader;
- if (chunkMetadataList.isEmpty()) {
- seriesReader = new EmptyFileSeriesReader();
- dataTypes.add(metadataQuerier.getDataType(path));
- } else {
- seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
- dataTypes.add(chunkMetadataList.get(0).getDataType());
- }
- readersOfSelectedSeries.add(seriesReader);
- }
-
- // read data from tsfile and construct session to send to IoTDB
- QueryDataSet dataSet =
- new DataSetWithoutTimeGenerator(paths, dataTypes, readersOfSelectedSeries);
- Tablet tablet = new Tablet(curDevice, measurementSchemas, MAX_TABLET_LENGTH);
- tablet.initBitMaps();
- int measurementSize = measurementSchemas.size();
- while (dataSet.hasNext()) {
- RowRecord rowRecord = dataSet.next();
- tablet.addTimestamp(tablet.rowSize, rowRecord.getTimestamp());
- for (int i = 0; i < measurementSize; i++) {
- Field field = rowRecord.getFields().get(i);
- if (field == null) {
- tablet.bitMaps[i].mark(tablet.rowSize);
- } else {
- tablet.addValue(
- measurementSchemas.get(i).getMeasurementId(),
- tablet.rowSize,
- field.getObjectValue(field.getDataType()));
- }
- }
- tablet.rowSize++;
- if (tablet.rowSize == MAX_TABLET_LENGTH) {
- if (isAligned) {
- session.insertAlignedTablet(tablet);
- } else {
- session.insertTablet(tablet);
- }
- tablet.reset();
- }
- }
- if (isAligned) {
- session.insertAlignedTablet(tablet);
- } else {
- session.insertTablet(tablet);
+ if (!ignoreBrokenChunk) {
+ long status = reader.selfCheck(new HashMap<>(), new ArrayList<>(), true);
+ if (status == TsFileCheckStatus.INCOMPATIBLE_FILE
+ || status == TsFileCheckStatus.FILE_EXISTS_MISTAKES) {
+ throw new IOException(
+ String.format(
+ "The file %s is incompatible, cannot rewrite it to IoTDB. If you want to rewrite "
+ + "all the good chunks in the file, retry with option -ignore-broken.",
+ filename));
}
}
- }
- }
-
- private static void parseDeviceFromTsFile(String filename) throws IOException {
- device2Measurements = new HashMap<>();
- try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
- String curDevice = null;
+ List<long[]> timeBatch = new ArrayList<>();
+ int pageIndex = 0;
byte marker;
+ String currentDevice = null;
+ boolean isAlignedChunk = false;
+ List<List<TsPrimitiveType>> valueForAlignedSeries = new ArrayList<>();
+ List<Long> timeForAlignedSeries = new ArrayList<>();
+ List<MeasurementSchema> schemaForAlignedSeries = new ArrayList<>();
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
@@ -413,60 +350,215 @@ public class RewriteTsFileTool {
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
ChunkHeader header = reader.readChunkHeader(marker);
- MeasurementSchema measurementSchema =
- new MeasurementSchema(
- header.getMeasurementID(),
- header.getDataType(),
- header.getEncodingType(),
- header.getCompressionType());
- device2Measurements
- .computeIfAbsent(curDevice, o -> new HashSet<>())
- .add(measurementSchema);
- reader.position(reader.position() + header.getDataSize());
+ if (header.getDataSize() == 0) {
+ // empty value chunk
+ break;
+ }
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+ int dataSize = header.getDataSize();
+ pageIndex = 0;
+ if (header.getDataType() == TSDataType.VECTOR) {
+ timeBatch.clear();
+ }
+ boolean addSchema = false;
+ List<TsPrimitiveType> valueList = new ArrayList<>();
+ while (dataSize > 0) {
+ valueDecoder.reset();
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+ == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+ readAndSendTimePage(
+ addSchema,
+ schemaForAlignedSeries,
+ header,
+ pageHeader,
+ pageData,
+ defaultTimeDecoder,
+ timeBatch,
+ pageIndex,
+ timeForAlignedSeries);
+ addSchema = true;
+ isAlignedChunk = true;
+ } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+ == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+ readAndSendValuePage(
+ addSchema,
+ schemaForAlignedSeries,
+ header,
+ pageHeader,
+ pageData,
+ valueDecoder,
+ timeBatch,
+ pageIndex,
+ valueList);
+ } else { // NonAligned Chunk
+ readAndSendSingleSeriesPage(
+ currentDevice, header, pageData, valueDecoder, defaultTimeDecoder, session);
+ }
+ pageIndex++;
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ if (isAlignedChunk) {
+ valueForAlignedSeries.add(valueList);
+ }
break;
case MetaMarker.CHUNK_GROUP_HEADER:
+ // get the next chunk group
+ if (isAlignedChunk) {
+ Tablet tablet = new Tablet(currentDevice, schemaForAlignedSeries, MAX_TABLET_LENGTH);
+ for (int i = 0; i < timeForAlignedSeries.size(); ++i) {
+ tablet.timestamps[tablet.rowSize] = timeForAlignedSeries.get(i);
+ TsPrimitiveType[] values = new TsPrimitiveType[valueForAlignedSeries.size()];
+ for (int j = 0; j < valueForAlignedSeries.size(); ++j) {
+ if (valueForAlignedSeries.get(j).size() < i) {
+ values[j] = valueForAlignedSeries.get(j).get(i);
+ } else {
+ values[j] = null;
+ }
+ }
+ tablet.values[tablet.rowSize++] = values;
+ if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ }
+ if (tablet.rowSize >= 0) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ }
+ isAlignedChunk = false;
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
- curDevice = chunkGroupHeader.getDeviceID();
+ currentDevice = chunkGroupHeader.getDeviceID();
break;
case MetaMarker.OPERATION_INDEX_RANGE:
reader.readPlanIndex();
break;
default:
- MetaMarker.handleUnexpectedMarker(marker);
+ System.out.printf(
+ "Cannot handle marker %d in position %d, stop reading %s%n",
+ marker, reader.position(), filename);
}
}
}
+
+ writeModification(filename, session);
}
- private static void modifyChunkMetadata(
- boolean isAligned,
- Path path,
- List<IChunkMetadata> chunkMetadataList,
- List<Modification> modifications)
- throws IllegalPathException {
- if (modifications == null || modifications.isEmpty()) {
- return;
+ private static void readAndSendValuePage(
+ boolean addSchema,
+ List<MeasurementSchema> schemaForAlignedSeries,
+ ChunkHeader header,
+ PageHeader pageHeader,
+ ByteBuffer pageData,
+ Decoder valueDecoder,
+ List<long[]> timeBatch,
+ int pageIndex,
+ List<TsPrimitiveType> valueList) {
+ if (!addSchema) {
+ schemaForAlignedSeries.add(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType()));
+ addSchema = true;
}
- List<Modification> measurementModifications = new ArrayList<>();
- Iterator<Modification> modsIterator = modifications.listIterator();
- Deletion currentDeletion;
- while (modsIterator.hasNext()) {
- currentDeletion = (Deletion) modsIterator.next();
- // if deletion path match the chunkPath, then add the deletion to the list
- if (currentDeletion.getPath().matchFullPath(new PartialPath(path.getFullPath()))) {
- measurementModifications.add(currentDeletion);
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+ TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+ valueList.addAll(Arrays.asList(valueBatch));
+ }
+
+ private static void readAndSendTimePage(
+ boolean addSchema,
+ List<MeasurementSchema> schemaForAlignedSeries,
+ ChunkHeader header,
+ PageHeader pageHeader,
+ ByteBuffer pageData,
+ Decoder defaultTimeDecoder,
+ List<long[]> timeBatch,
+ int pageIndex,
+ List<Long> timeForAlignedSeries)
+ throws IOException {
+ if (!addSchema) {
+ schemaForAlignedSeries.add(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType()));
+ }
+ TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+ timeBatch.add(timePageReader.getNextTimeBatch());
+ for (int i = 0; i < timeBatch.get(pageIndex).length; i++) {
+ timeForAlignedSeries.add(timeBatch.get(pageIndex)[i]);
+ }
+ }
+
+ private static void readAndSendSingleSeriesPage(
+ String currentDevice,
+ ChunkHeader header,
+ ByteBuffer pageData,
+ Decoder valueDecoder,
+ Decoder defaultTimeDecoder,
+ Session session)
+ throws IOException, IoTDBConnectionException, StatementExecutionException {
+ Tablet tablet =
+ new Tablet(
+ currentDevice,
+ Collections.singletonList(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType())),
+ MAX_TABLET_LENGTH);
+ PageReader pageReader =
+ new PageReader(pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ tablet.timestamps[tablet.rowSize] = batchData.currentTime();
+ tablet.values[tablet.rowSize++] = batchData.currentValue();
+ if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+ session.insertTablet(tablet);
+ tablet.reset();
}
+ batchData.next();
}
- if (!isAligned) {
- QueryUtils.modifyChunkMetaData(chunkMetadataList, measurementModifications);
- } else {
- List<AlignedChunkMetadata> alignedChunkMetadataList = new ArrayList<>();
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- alignedChunkMetadataList.add((AlignedChunkMetadata) chunkMetadata);
+ if (tablet.rowSize > 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ }
+
+ private static void writeModification(String filename, Session session)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<Modification> modifications = null;
+ if (FSFactoryProducer.getFSFactory()
+ .getFile(filename + ModificationFile.FILE_SUFFIX)
+ .exists()) {
+ modifications =
+ (List<Modification>)
+ new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
+ for (Modification modification : modifications) {
+ session.executeNonQueryStatement(
+ String.format(
+ "delete from %s.%s where time >= %d and time <= %d",
+ modification.getDevice(),
+ modification.getMeasurement(),
+ ((Deletion) modification).getStartTime(),
+ ((Deletion) modification).getEndTime()));
}
- // AlignedChunk only contains one valueChunkMetadata which is measurement with this path
- QueryUtils.modifyAlignedChunkMetaData(
- alignedChunkMetadataList, Collections.singletonList(measurementModifications));
}
}