You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/26 02:22:12 UTC
[iotdb] 01/01: Fix aligned write bug in tsfile api
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch AlignedTsFileAPIBug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a008b576aec03baa54debc408641680bec7be6e1
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu May 26 10:21:57 2022 +0800
Fix aligned write bug in tsfile api
---
.../write/chunk/AlignedChunkGroupWriterImpl.java | 85 +++++++++++++++-------
1 file changed, 60 insertions(+), 25 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 79d9ed271e..9a43b55b7b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
@@ -50,11 +50,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private final String deviceId;
// measurementID -> ValueChunkWriter
- private Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
+ private final Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
- private TimeChunkWriter timeChunkWriter;
-
- private Set<String> writenMeasurementSet = new HashSet<>();
+ private final TimeChunkWriter timeChunkWriter;
private long lastTime = -1;
@@ -103,10 +101,16 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
@Override
public int write(long time, List<DataPoint> data) throws WriteProcessException, IOException {
- checkIsHistoryData("", time);
-
+ checkIsHistoryData(time);
+ List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+ Set<String> existingMeasurements =
+ data.stream().map(DataPoint::getMeasurementId).collect(Collectors.toSet());
+ for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+ if (!existingMeasurements.contains(entry.getKey())) {
+ emptyValueChunkWriters.add(entry.getValue());
+ }
+ }
for (DataPoint point : data) {
- writenMeasurementSet.add(point.getMeasurementId());
boolean isNull = point.getValue() == null;
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId());
switch (point.getType()) {
@@ -133,7 +137,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
String.format("Data type %s is not supported.", point.getType()));
}
}
- writeEmptyDataInOneRow(time);
+ if (!emptyValueChunkWriters.isEmpty()) {
+ writeEmptyDataInOneRow(emptyValueChunkWriters);
+ }
timeChunkWriter.write(time);
lastTime = time;
if (checkPageSizeAndMayOpenANewPage()) {
@@ -146,18 +152,25 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
public int write(Tablet tablet) throws WriteProcessException, IOException {
int pointCount = 0;
List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
+ List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
+ Set<String> existingMeasurements =
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toSet());
+ for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+ if (!existingMeasurements.contains(entry.getKey())) {
+ emptyValueChunkWriters.add(entry.getValue());
+ }
+ }
for (int row = 0; row < tablet.rowSize; row++) {
long time = tablet.timestamps[row];
- checkIsHistoryData("", time);
+ checkIsHistoryData(time);
for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
- writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId());
- boolean isNull = false;
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
// check isNull by bitMap in tablet
- if (tablet.bitMaps != null
- && tablet.bitMaps[columnIndex] != null
- && tablet.bitMaps[columnIndex].isMarked(row)) {
- isNull = true;
- }
ValueChunkWriter valueChunkWriter =
valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
switch (measurementSchemas.get(columnIndex).getType()) {
@@ -186,7 +199,9 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
measurementSchemas.get(columnIndex).getType()));
}
}
- writeEmptyDataInOneRow(time);
+ if (!emptyValueChunkWriters.isEmpty()) {
+ writeEmptyDataInOneRow(emptyValueChunkWriters);
+ }
timeChunkWriter.write(time);
lastTime = time;
if (checkPageSizeAndMayOpenANewPage()) {
@@ -241,13 +256,33 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
}
}
- private void writeEmptyDataInOneRow(long time) {
- for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
- if (!writenMeasurementSet.contains(entry.getKey())) {
- entry.getValue().write(time, 0, true);
+ private void writeEmptyDataInOneRow(List<ValueChunkWriter> valueChunkWriterList) {
+ for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
+ TSDataType dataType = valueChunkWriter.getDataType();
+ switch (dataType) {
+ case BOOLEAN:
+ valueChunkWriter.write(-1, false, true);
+ break;
+ case INT32:
+ valueChunkWriter.write(-1, 0, true);
+ break;
+ case INT64:
+ valueChunkWriter.write(-1, 0L, true);
+ break;
+ case FLOAT:
+ valueChunkWriter.write(-1, 0.0f, true);
+ break;
+ case DOUBLE:
+ valueChunkWriter.write(-1, 0.0d, true);
+ break;
+ case TEXT:
+ valueChunkWriter.write(-1, null, true);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
}
}
- writenMeasurementSet.clear();
}
/**
@@ -280,13 +315,13 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
}
}
- private void checkIsHistoryData(String measurementId, long time) throws WriteProcessException {
+ private void checkIsHistoryData(long time) throws WriteProcessException {
if (time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
+ deviceId
+ TsFileConstant.PATH_SEPARATOR
- + measurementId
+ + ""
+ ", time should later than "
+ lastTime);
}