You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/12 03:14:33 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-4057]Support aligned timeseries in rewriteTsFileTool (#6972)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 865711ccae [To rel/0.13][IOTDB-4057]Support aligned timeseries in rewriteTsFileTool (#6972)
865711ccae is described below
commit 865711ccaec355686966cdf685dc6b29c9bcf913
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Fri Aug 12 11:14:28 2022 +0800
[To rel/0.13][IOTDB-4057]Support aligned timeseries in rewriteTsFileTool (#6972)
---
.../java/org/apache/iotdb/RewriteFileTool.java | 195 ++++++++++++++++++++-
.../apache/iotdb/tsfile/write/record/Tablet.java | 63 +++++++
2 files changed, 255 insertions(+), 3 deletions(-)
diff --git a/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java b/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
index 6ad579b025..df0805844e 100644
--- a/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
+++ b/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
@@ -43,8 +43,10 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.page.AlignedPageReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -55,6 +57,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -195,12 +198,69 @@ public class RewriteFileTool {
byte marker;
String curDevice = null;
long chunkHeaderOffset = -1;
+
+ // Used for rewriting aligned chunk group
+ List<PageHeader> timePageHeaders = new ArrayList<>();
+ List<ByteBuffer> timePageDatas = new ArrayList<>();
+ List<List<PageHeader>> valuePageHeadersList = new ArrayList<>();
+ List<List<ByteBuffer>> valuePageDatasList = new ArrayList<>();
+ List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+ List<List<TimeRange>> deleteIntervalsList = new ArrayList<>();
+
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
+ case MetaMarker.TIME_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+ ChunkHeader header = reader.readChunkHeader(marker);
+ int dataSize = header.getDataSize();
+ while (dataSize > 0) {
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ timePageHeaders.add(pageHeader);
+ timePageDatas.add(pageData);
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ break;
+ case MetaMarker.VALUE_CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+ chunkHeaderOffset = reader.position() - 1;
+ header = reader.readChunkHeader(marker);
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType());
+ measurementSchemas.add(measurementSchema);
+
+ // read delete time range from old modification file
+ deleteIntervalsList.add(
+ getOldSortedDeleteIntervals(
+ curDevice, measurementSchema, chunkHeaderOffset, modifications));
+
+ dataSize = header.getDataSize();
+ int pageIndex = 0;
+ while (dataSize > 0) {
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ if (valuePageHeadersList.size() == pageIndex) {
+ valuePageHeadersList.add(new ArrayList<>());
+ valuePageDatasList.add(new ArrayList<>());
+ }
+ valuePageHeadersList.get(pageIndex).add(pageHeader);
+ valuePageDatasList.get(pageIndex).add(pageData);
+ pageIndex++;
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ break;
case MetaMarker.CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
chunkHeaderOffset = reader.position() - 1;
- ChunkHeader header = reader.readChunkHeader(marker);
+ header = reader.readChunkHeader(marker);
Decoder defaultTimeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
@@ -209,14 +269,14 @@ public class RewriteFileTool {
Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
// 1. construct MeasurementSchema from chunkHeader
String measurement = header.getMeasurementID();
- MeasurementSchema measurementSchema =
+ measurementSchema =
new MeasurementSchema(
measurement,
header.getDataType(),
header.getEncodingType(),
header.getCompressionType());
// 2. record data point of each measurement
- int dataSize = header.getDataSize();
+ dataSize = header.getDataSize();
while (dataSize > 0) {
valueDecoder.reset();
PageHeader pageHeader =
@@ -282,10 +342,32 @@ public class RewriteFileTool {
}
break;
case MetaMarker.CHUNK_GROUP_HEADER:
+ if (!timePageHeaders.isEmpty()) {
+ rewriteAlignedChunkGroup(
+ timePageHeaders,
+ timePageDatas,
+ valuePageHeadersList,
+ valuePageDatasList,
+ measurementSchemas,
+ deleteIntervalsList,
+ curDevice,
+ session);
+ }
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
curDevice = chunkGroupHeader.getDeviceID();
break;
case MetaMarker.OPERATION_INDEX_RANGE:
+ if (!timePageHeaders.isEmpty()) {
+ rewriteAlignedChunkGroup(
+ timePageHeaders,
+ timePageDatas,
+ valuePageHeadersList,
+ valuePageDatasList,
+ measurementSchemas,
+ deleteIntervalsList,
+ curDevice,
+ session);
+ }
reader.readPlanIndex();
reader.getMinPlanIndex();
reader.getMaxPlanIndex();
@@ -294,6 +376,17 @@ public class RewriteFileTool {
MetaMarker.handleUnexpectedMarker(marker);
}
}
+ if (!timePageHeaders.isEmpty()) {
+ rewriteAlignedChunkGroup(
+ timePageHeaders,
+ timePageDatas,
+ valuePageHeadersList,
+ valuePageDatasList,
+ measurementSchemas,
+ deleteIntervalsList,
+ curDevice,
+ session);
+ }
} catch (IllegalPathException
| IOException
| IoTDBConnectionException
@@ -304,6 +397,102 @@ public class RewriteFileTool {
}
}
+ private static void rewriteAlignedChunkGroup(
+ List<PageHeader> timePageHeaders,
+ List<ByteBuffer> timePageDatas,
+ List<List<PageHeader>> valuePageHeadersList,
+ List<List<ByteBuffer>> valuePageDatasList,
+ List<MeasurementSchema> measurementSchemas,
+ List<List<TimeRange>> deleteIntervalsList,
+ String curDevice,
+ Session session)
+ throws IOException, IoTDBConnectionException, StatementExecutionException {
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+
+ List<Decoder> valueDecoders = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(
+ measurementSchema.getEncodingType(), measurementSchema.getType());
+ valueDecoders.add(valueDecoder);
+ dataTypes.add(measurementSchema.getType());
+ }
+
+ for (int pageIndex = 0; pageIndex < timePageHeaders.size(); pageIndex++) {
+ AlignedPageReader alignedPageReader =
+ new AlignedPageReader(
+ timePageHeaders.get(pageIndex),
+ timePageDatas.get(pageIndex),
+ defaultTimeDecoder,
+ valuePageHeadersList.get(pageIndex),
+ valuePageDatasList.get(pageIndex),
+ dataTypes,
+ valueDecoders,
+ null);
+
+ alignedPageReader.setDeleteIntervalList(deleteIntervalsList);
+ BatchData batchData = alignedPageReader.getAllSatisfiedPageData();
+ int maxRow = batchData.length();
+ Tablet tablet = new Tablet(curDevice, measurementSchemas, maxRow);
+
+ long curTabletSize = 0;
+ while (batchData.hasCurrent()) {
+ tablet.addTimestamp(tablet.rowSize, batchData.currentTime());
+ tablet.addValues(tablet.rowSize, (TsPrimitiveType[]) batchData.currentValue());
+ tablet.rowSize++;
+ // calculate curTabletSize based on timestamp and value
+ curTabletSize += 8;
+ for (TSDataType dataType : dataTypes) {
+ switch (dataType) {
+ case BOOLEAN:
+ curTabletSize += 1;
+ break;
+ case INT32:
+ case FLOAT:
+ curTabletSize += 4;
+ break;
+ case INT64:
+ case DOUBLE:
+ curTabletSize += 8;
+ break;
+ case TEXT:
+ curTabletSize += 4 + ((Binary) batchData.currentValue()).getLength();
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ }
+
+ if (curTabletSize >= MAX_TABLET_SIZE) {
+ session.insertAlignedTablet(tablet);
+ curTabletSize = 0;
+ tablet.reset();
+ }
+ batchData.next();
+ }
+ if (tablet.rowSize > 0) {
+ session.insertAlignedTablet(tablet);
+ }
+
+ for (Decoder decoder : valueDecoders) {
+ decoder.reset();
+ }
+ defaultTimeDecoder.reset();
+ }
+
+ timePageHeaders.clear();
+ timePageDatas.clear();
+ valuePageHeadersList.clear();
+ valuePageDatasList.clear();
+ measurementSchemas.clear();
+ deleteIntervalsList.clear();
+ }
+
private static boolean checkArgs(String[] args) {
String paramConfig =
"-b=[path of backUp directory] -vf=[path of validation file]/-f=[path of tsfile list] -o=[path of output log] -u=[username, default=\"root\"] -pw=[password, default=\"root\"]";
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
index bb66da22a9..72e055da5c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/record/Tablet.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
@@ -121,12 +122,74 @@ public class Tablet {
timestamps[rowIndex] = timestamp;
}
+ public void addValues(int rowIndex, TsPrimitiveType[] values) {
+ for (int i = 0; i < schemas.size(); i++) {
+ MeasurementSchema measurementSchema = schemas.get(i);
+ addValueOfDataType(measurementSchema.getType(), rowIndex, i, values[i]);
+ }
+ }
+
public void addValue(String measurementId, int rowIndex, Object value) {
int indexOfSchema = measurementIndex.get(measurementId);
MeasurementSchema measurementSchema = schemas.get(indexOfSchema);
addValueOfDataType(measurementSchema.getType(), rowIndex, indexOfSchema, value);
}
+ private void addValueOfDataType(
+ TSDataType dataType, int rowIndex, int indexOfSchema, TsPrimitiveType value) {
+ if (value == null) {
+ // init the bitMap to mark null value
+ if (bitMaps == null) {
+ bitMaps = new BitMap[values.length];
+ }
+ if (bitMaps[indexOfSchema] == null) {
+ bitMaps[indexOfSchema] = new BitMap(maxRowNumber);
+ }
+ // mark the null value position
+ bitMaps[indexOfSchema].mark(rowIndex);
+ }
+ switch (dataType) {
+ case TEXT:
+ {
+ Binary[] sensor = (Binary[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? value.getBinary() : Binary.EMPTY_VALUE;
+ break;
+ }
+ case FLOAT:
+ {
+ float[] sensor = (float[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? value.getFloat() : Float.MIN_VALUE;
+ break;
+ }
+ case INT32:
+ {
+ int[] sensor = (int[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? value.getInt() : Integer.MIN_VALUE;
+ break;
+ }
+ case INT64:
+ {
+ long[] sensor = (long[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? value.getLong() : Long.MIN_VALUE;
+ break;
+ }
+ case DOUBLE:
+ {
+ double[] sensor = (double[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null ? value.getDouble() : Double.MIN_VALUE;
+ break;
+ }
+ case BOOLEAN:
+ {
+ boolean[] sensor = (boolean[]) values[indexOfSchema];
+ sensor[rowIndex] = value != null && value.getBoolean();
+ break;
+ }
+ default:
+ throw new UnSupportedDataTypeException(String.format(NOT_SUPPORT_DATATYPE, dataType));
+ }
+ }
+
private void addValueOfDataType(
TSDataType dataType, int rowIndex, int indexOfSchema, Object value) {