You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/17 06:49:11 UTC
[iotdb] branch master updated: [IOTDB-4650] Support starting reading from tail in RewriteTsFileTool (#7604)
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4eaa59103c [IOTDB-4650] Support starting reading from tail in RewriteTsFileTool (#7604)
4eaa59103c is described below
commit 4eaa59103c2ad877611eade6f68673a185837aa4
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Mon Oct 17 14:49:04 2022 +0800
[IOTDB-4650] Support starting reading from tail in RewriteTsFileTool (#7604)
---
.../java/org/apache/iotdb/RewriteTsFileTool.java | 249 ++++++++++++++++++++-
1 file changed, 240 insertions(+), 9 deletions(-)
diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index 653f2de67c..4b1d47976d 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -21,9 +21,12 @@ package org.apache.iotdb;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -35,11 +38,16 @@ import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
@@ -48,11 +56,18 @@ import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.IChunkReader;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.commons.cli.CommandLine;
@@ -66,9 +81,11 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -81,6 +98,7 @@ public class RewriteTsFileTool {
private static String user = "root";
private static String password = "root";
private static String filePath = "";
+ private static String readMode = "s";
private static Map<String, Set<MeasurementSchema>> device2Measurements;
@@ -119,6 +137,7 @@ public class RewriteTsFileTool {
user = getArgOrDefault(commandLine, "u", user);
password = getArgOrDefault(commandLine, "pw", password);
filePath = getArgOrDefault(commandLine, "f", filePath);
+ readMode = getArgOrDefault(commandLine, "rm", readMode);
} catch (ParseException e) {
System.out.printf("Parse Args Error. %s%n", e.getMessage());
priHelp(options);
@@ -175,6 +194,15 @@ public class RewriteTsFileTool {
.required()
.build();
options.addOption(filePathOpt);
+
+ Option readModeOpt =
+ Option.builder("rm")
+ .argName("readMode")
+ .hasArg()
+ .desc("Read mode, s(equence) or r(everse)")
+ .required()
+ .build();
+ options.addOption(readModeOpt);
return options;
}
@@ -207,21 +235,37 @@ public class RewriteTsFileTool {
List<File> unloadTsFiles = new ArrayList<>();
System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", size);
System.out.println("Start Loading TsFiles...");
- for (int i = 0; i < size; i++) {
- File file = files.get(i);
- System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+ if (readMode.equals("s")) {
+ for (int i = 0; i < size; i++) {
+ File file = files.get(i);
+ System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+ try {
+ seqWriteTsFile(file.getPath(), session);
+ } catch (Exception e) {
+ System.out.println(
+ "------------------------------Error Message------------------------------");
+ e.printStackTrace();
+ System.out.println(
+ "------------------------------End Message------------------------------");
+ unloadTsFiles.add(file);
+ continue;
+ }
+ System.out.println("Done");
+ }
+ } else {
try {
- writeTsFile(file.getPath(), session);
- } catch (Exception e) {
+ reverseWriteTsFile(files, session);
+ } catch (IOException
+ | IllegalPathException
+ | IoTDBConnectionException
+ | StatementExecutionException
+ | NoMeasurementException e) {
System.out.println(
"------------------------------Error Message------------------------------");
e.printStackTrace();
System.out.println(
"------------------------------End Message------------------------------");
- unloadTsFiles.add(file);
- continue;
}
- System.out.println("Done");
}
System.out.println("Finish Loading TsFiles");
System.out.printf(
@@ -262,7 +306,7 @@ public class RewriteTsFileTool {
* @param filename the file path to be loaded
* @param session IoTDB session
*/
- public static void writeTsFile(String filename, Session session)
+ public static void seqWriteTsFile(String filename, Session session)
throws IOException, IllegalPathException, IoTDBConnectionException,
StatementExecutionException, NoMeasurementException {
// parse modifications from .mods
@@ -425,4 +469,191 @@ public class RewriteTsFileTool {
alignedChunkMetadataList, Collections.singletonList(measurementModifications));
}
}
+
+ /**
+ * Read the chunk metadata first, then read the chunk according to chunk metadata
+ *
+ * @param files
+ * @param session
+ * @throws IOException
+ * @throws IllegalPathException
+ * @throws IoTDBConnectionException
+ * @throws StatementExecutionException
+ * @throws NoMeasurementException
+ */
+ public static void reverseWriteTsFile(List<File> files, Session session)
+ throws IOException, IllegalPathException, IoTDBConnectionException,
+ StatementExecutionException, NoMeasurementException {
+ List<TsFileResource> resources = new ArrayList<>();
+ files.forEach(x -> resources.add(new TsFileResource(x)));
+ try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(resources)) {
+ while (deviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> devicePair = deviceIterator.nextDevice();
+ String device = devicePair.left;
+ boolean isAligned = devicePair.right;
+ if (isAligned) {
+ try {
+ writeAlignedSeries(device, deviceIterator, session);
+ } catch (Throwable t) {
+ // this is a broken aligned chunk, skip it
+ System.out.println("Skip aligned chunk " + device);
+ }
+ } else {
+ MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
+ deviceIterator.iterateNotAlignedSeries(device, true);
+ while (seriesIterator.hasNextSeries()) {
+ writeSingleSeries(device, seriesIterator, session);
+ }
+ }
+ }
+ }
+ }
+
+ /** Read data from tsfile and write it to IoTDB for a single not aligned series. */
+ protected static void writeSingleSeries(
+ String device, MultiTsFileDeviceIterator.MeasurementIterator seriesIterator, Session session)
+ throws IllegalPathException {
+ PartialPath p = new PartialPath(device, seriesIterator.nextSeries());
+ LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
+ seriesIterator.getMetadataListForCurrentSeries();
+ while (!readerAndChunkMetadataList.isEmpty()) {
+ Pair<TsFileSequenceReader, List<ChunkMetadata>> readerMetadataPair =
+ readerAndChunkMetadataList.removeFirst();
+ TsFileSequenceReader reader = readerMetadataPair.left;
+ List<ChunkMetadata> chunkMetadataList = readerMetadataPair.right;
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ try {
+ writeSingleChunk(device, p, chunkMetadata, reader, session);
+ } catch (Throwable t) {
+ // this is a broken chunk, skip it
+ System.out.printf("Skip broken chunk in device %s.%s%n", device, p.getMeasurement());
+ }
+ }
+ }
+ }
+
+ /** Read and write a single chunk for not aligned series. */
+ protected static void writeSingleChunk(
+ String device,
+ PartialPath p,
+ ChunkMetadata chunkMetadata,
+ TsFileSequenceReader reader,
+ Session session)
+ throws IOException, IoTDBConnectionException, StatementExecutionException {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+ ChunkHeader chunkHeader = chunk.getHeader();
+ MeasurementSchema schema =
+ new MeasurementSchema(
+ p.getMeasurement(),
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+ Tablet tablet = new Tablet(device, Collections.singletonList(schema), MAX_TABLET_LENGTH);
+ IChunkReader chunkReader = new ChunkReader(chunk, null);
+ while (chunkReader.hasNextSatisfiedPage()) {
+ IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
+ while (batchIterator.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
+ tablet.timestamps[tablet.rowSize] = timeValuePair.getTimestamp();
+ tablet.values[tablet.rowSize++] = timeValuePair.getValue();
+ if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ }
+ }
+ if (tablet.rowSize > 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ }
+
+ /** Collect the schema list for an aligned device. */
+ private static List<IMeasurementSchema> collectSchemaFromAlignedChunkMetadataList(
+ LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList)
+ throws IOException {
+ Set<MeasurementSchema> schemaSet = new HashSet<>();
+ Set<String> measurementSet = new HashSet<>();
+ for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair :
+ readerAndChunkMetadataList) {
+ TsFileSequenceReader reader = readerListPair.left;
+ List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right;
+ for (AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) {
+ List<IChunkMetadata> valueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
+ for (IChunkMetadata chunkMetadata : valueChunkMetadataList) {
+ if (chunkMetadata == null) {
+ continue;
+ }
+ if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
+ continue;
+ }
+ measurementSet.add(chunkMetadata.getMeasurementUid());
+ Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata) chunkMetadata);
+ ChunkHeader header = chunk.getHeader();
+ schemaSet.add(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType()));
+ }
+ }
+ }
+ List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet);
+ schemaList.sort(Comparator.comparing(IMeasurementSchema::getMeasurementId));
+ return schemaList;
+ }
+
+ /** Read and write an aligned series. */
+ protected static void writeAlignedSeries(
+ String device, MultiTsFileDeviceIterator deviceIterator, Session session)
+ throws IOException, IoTDBConnectionException, StatementExecutionException {
+ LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList =
+ deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ List<IMeasurementSchema> iSchemaList =
+ collectSchemaFromAlignedChunkMetadataList(readerAndChunkMetadataList);
+ iSchemaList.forEach(x -> schemaList.add((MeasurementSchema) x));
+ while (readerAndChunkMetadataList.size() > 0) {
+ Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair =
+ readerAndChunkMetadataList.removeFirst();
+ TsFileSequenceReader reader = readerListPair.left;
+ List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right;
+ TsFileAlignedSeriesReaderIterator readerIterator =
+ new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, iSchemaList);
+ writeAlignedChunk(readerIterator, device, schemaList, session);
+ }
+ }
+
+ private static void writeAlignedChunk(
+ TsFileAlignedSeriesReaderIterator readerIterator,
+ String device,
+ List<MeasurementSchema> schemaList,
+ Session session)
+ throws IOException, IoTDBConnectionException, StatementExecutionException {
+ while (readerIterator.hasNext()) {
+ Tablet tablet = new Tablet(device, schemaList, MAX_TABLET_LENGTH);
+ Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader();
+ AlignedChunkReader alignedChunkReader = chunkReaderAndChunkSize.left;
+ while (alignedChunkReader.hasNextSatisfiedPage()) {
+ IBatchDataIterator batchDataIterator =
+ alignedChunkReader.nextPageData().getBatchDataIterator();
+ while (batchDataIterator.hasNext()) {
+ TsPrimitiveType[] pointsData = (TsPrimitiveType[]) batchDataIterator.currentValue();
+ tablet.timestamps[tablet.rowSize] = batchDataIterator.currentTime();
+ tablet.values[tablet.rowSize++] = batchDataIterator.currentValue();
+ batchDataIterator.next();
+ if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ }
+ }
+ if (tablet.rowSize > 0) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ }
+ }
}