You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/27 01:10:02 UTC
[iotdb] branch master updated: Improve aligned write in tsfile api (#6021)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b535b278dc Improve aligned write in tsfile api (#6021)
b535b278dc is described below
commit b535b278dc0d71bde001c23e8eefdb3f60f6881c
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri May 27 09:09:57 2022 +0800
Improve aligned write in tsfile api (#6021)
---
.../write/chunk/AlignedChunkGroupWriterImpl.java | 85 +++++++++++++++-------
.../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 2 +-
2 files changed, 61 insertions(+), 26 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 79d9ed271e..9a43b55b7b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
@@ -50,11 +50,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private final String deviceId;
// measurementID -> ValueChunkWriter
- private Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
+ private final Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
- private TimeChunkWriter timeChunkWriter;
-
- private Set<String> writenMeasurementSet = new HashSet<>();
+ private final TimeChunkWriter timeChunkWriter;
private long lastTime = -1;
@@ -103,10 +101,16 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
@Override
public int write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
- checkIsHistoryData("", time);
-
+ checkIsHistoryData(time);
+ List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+ Set<String> existingMeasurements =
+ data.stream().map(DataPoint::getMeasurementId).collect(Collectors.toSet());
+ for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+ if (!existingMeasurements.contains(entry.getKey())) {
+ emptyValueChunkWriters.add(entry.getValue());
+ }
+ }
for (DataPoint point : data) {
- writenMeasurementSet.add(point.getMeasurementId());
boolean isNull = point.getValue() == null;
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId());
switch (point.getType()) {
@@ -133,7 +137,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
String.format("Data type %s is not supported.", point.getType()));
}
}
- writeEmptyDataInOneRow(time);
+ if (!emptyValueChunkWriters.isEmpty()) {
+ writeEmptyDataInOneRow(emptyValueChunkWriters);
+ }
timeChunkWriter.write(time);
lastTime = time;
if (checkPageSizeAndMayOpenANewPage()) {
@@ -146,18 +152,25 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
public int write(Tablet tablet) throws WriteProcessException, IOException {
int pointCount = 0;
List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
+ List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+ Set<String> existingMeasurements =
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toSet());
+ for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+ if (!existingMeasurements.contains(entry.getKey())) {
+ emptyValueChunkWriters.add(entry.getValue());
+ }
+ }
for (int row = 0; row < tablet.rowSize; row++) {
long time = tablet.timestamps[row];
- checkIsHistoryData("", time);
+ checkIsHistoryData(time);
for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
- writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId());
- boolean isNull = false;
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
// check isNull by bitMap in tablet
- if (tablet.bitMaps != null
- && tablet.bitMaps[columnIndex] != null
- && tablet.bitMaps[columnIndex].isMarked(row)) {
- isNull = true;
- }
ValueChunkWriter valueChunkWriter =
valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
switch (measurementSchemas.get(columnIndex).getType()) {
@@ -186,7 +199,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
measurementSchemas.get(columnIndex).getType()));
}
}
- writeEmptyDataInOneRow(time);
+ if (!emptyValueChunkWriters.isEmpty()) {
+ writeEmptyDataInOneRow(emptyValueChunkWriters);
+ }
timeChunkWriter.write(time);
lastTime = time;
if (checkPageSizeAndMayOpenANewPage()) {
@@ -241,13 +256,33 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
}
}
- private void writeEmptyDataInOneRow(long time) {
- for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
- if (!writenMeasurementSet.contains(entry.getKey())) {
- entry.getValue().write(time, 0, true);
+ private void writeEmptyDataInOneRow(List<ValueChunkWriter> valueChunkWriterList) {
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ TSDataType dataType = valueChunkWriter.getDataType();
+ switch (dataType) {
+ case BOOLEAN:
+ valueChunkWriter.write(-1, false, true);
+ break;
+ case INT32:
+ valueChunkWriter.write(-1, 0, true);
+ break;
+ case INT64:
+ valueChunkWriter.write(-1, 0L, true);
+ break;
+ case FLOAT:
+ valueChunkWriter.write(-1, 0.0f, true);
+ break;
+ case DOUBLE:
+ valueChunkWriter.write(-1, 0.0d, true);
+ break;
+ case TEXT:
+ valueChunkWriter.write(-1, null, true);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
}
}
- writenMeasurementSet.clear();
}
/**
@@ -280,13 +315,13 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
}
}
- private void checkIsHistoryData(String measurementId, long time) throws WriteProcessException {
+ private void checkIsHistoryData(long time) throws WriteProcessException {
if (time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
+ deviceId
+ TsFileConstant.PATH_SEPARATOR
- + measurementId
+ + ""
+ ", time should later than "
+ lastTime);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 3ece00b403..e89edc4407 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -217,7 +217,7 @@ public class ValueChunkWriter {
// Empty chunk, it may happen if pageBuffer stores empty bits and only chunk header will be
// flushed.
if (statistics.getCount() == 0) {
- return ChunkHeader.getSerializedSize(measurementId, pageBuffer.size());
+ return ChunkHeader.getSerializedSize(measurementId, 0);
}
// return the serialized size of the chunk header + all pages