You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/11/18 11:32:31 UTC
[iotdb] 01/09: improve aligned tsfile write api
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch improvedAlign_for_expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 29325681d2b477fb6b23c5f6d26b3e2008c24177
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu May 26 10:04:47 2022 +0800
improve aligned tsfile write api
---
.../write/chunk/AlignedChunkGroupWriterImpl.java | 220 ++++++++++++++++-----
.../iotdb/tsfile/write/chunk/TimeChunkWriter.java | 4 +
.../iotdb/tsfile/write/chunk/ValueChunkWriter.java | 4 +
3 files changed, 184 insertions(+), 44 deletions(-)
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
index 79d9ed271e..20e0e50c53 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkGroupWriterImpl.java
@@ -43,6 +43,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
@@ -144,57 +145,188 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
@Override
public int write(Tablet tablet) throws WriteProcessException, IOException {
- int pointCount = 0;
- List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
+ // write time
for (int row = 0; row < tablet.rowSize; row++) {
long time = tablet.timestamps[row];
checkIsHistoryData("", time);
- for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
- writenMeasurementSet.add(measurementSchemas.get(columnIndex).getMeasurementId());
- boolean isNull = false;
- // check isNull by bitMap in tablet
- if (tablet.bitMaps != null
- && tablet.bitMaps[columnIndex] != null
- && tablet.bitMaps[columnIndex].isMarked(row)) {
- isNull = true;
- }
- ValueChunkWriter valueChunkWriter =
- valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
- switch (measurementSchemas.get(columnIndex).getType()) {
- case BOOLEAN:
- valueChunkWriter.write(time, ((boolean[]) tablet.values[columnIndex])[row], isNull);
- break;
- case INT32:
- valueChunkWriter.write(time, ((int[]) tablet.values[columnIndex])[row], isNull);
- break;
- case INT64:
- valueChunkWriter.write(time, ((long[]) tablet.values[columnIndex])[row], isNull);
- break;
- case FLOAT:
- valueChunkWriter.write(time, ((float[]) tablet.values[columnIndex])[row], isNull);
- break;
- case DOUBLE:
- valueChunkWriter.write(time, ((double[]) tablet.values[columnIndex])[row], isNull);
- break;
- case TEXT:
- valueChunkWriter.write(time, ((Binary[]) tablet.values[columnIndex])[row], isNull);
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format(
- "Data type %s is not supported.",
- measurementSchemas.get(columnIndex).getType()));
- }
- }
- writeEmptyDataInOneRow(time);
timeChunkWriter.write(time);
lastTime = time;
- if (checkPageSizeAndMayOpenANewPage()) {
- writePageToPageBuffer();
+ if (timeChunkWriter.needANewPage()) {
+ timeChunkWriter.writePageToPageBuffer();
}
- pointCount++;
}
- return pointCount;
+
+ List<MeasurementSchema> measurementSchemas = tablet.getSchemas();
+ // write existed values
+ for (int columnIndex = 0; columnIndex < measurementSchemas.size(); columnIndex++) {
+ ValueChunkWriter valueChunkWriter =
+ valueChunkWriterMap.get(measurementSchemas.get(columnIndex).getMeasurementId());
+ TSDataType dataType = measurementSchemas.get(columnIndex).getType();
+ switch (dataType) {
+ case BOOLEAN:
+ boolean[] booleanValues = (boolean[]) tablet.values[columnIndex];
+ for (int row = 0; row < tablet.rowSize; row++) {
+ long time = tablet.timestamps[row];
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
+ // check isNull by bitMap in tablet
+ valueChunkWriter.write(time, booleanValues[row], isNull);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case INT32:
+ int[] intValues = (int[]) tablet.values[columnIndex];
+ for (int row = 0; row < tablet.rowSize; row++) {
+ long time = tablet.timestamps[row];
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
+ // check isNull by bitMap in tablet
+ valueChunkWriter.write(time, intValues[row], isNull);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case INT64:
+ long[] longValues = (long[]) tablet.values[columnIndex];
+ for (int row = 0; row < tablet.rowSize; row++) {
+ long time = tablet.timestamps[row];
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
+ // check isNull by bitMap in tablet
+ valueChunkWriter.write(time, longValues[row], isNull);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) tablet.values[columnIndex];
+ for (int row = 0; row < tablet.rowSize; row++) {
+ long time = tablet.timestamps[row];
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
+ // check isNull by bitMap in tablet
+ valueChunkWriter.write(time, floatValues[row], isNull);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) tablet.values[columnIndex];
+ for (int row = 0; row < tablet.rowSize; row++) {
+ long time = tablet.timestamps[row];
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
+ // check isNull by bitMap in tablet
+ valueChunkWriter.write(time, doubleValues[row], isNull);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) tablet.values[columnIndex];
+ for (int row = 0; row < tablet.rowSize; row++) {
+ long time = tablet.timestamps[row];
+ boolean isNull =
+ tablet.bitMaps != null
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(row);
+ // check isNull by bitMap in tablet
+ valueChunkWriter.write(time, binaryValues[row], isNull);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ }
+
+ if (measurementSchemas.size() != valueChunkWriterMap.size()) {
+ Set<String> existingMeasurements =
+ measurementSchemas.stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .collect(Collectors.toSet());
+ // write non-existed values
+ for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
+ if (!existingMeasurements.contains(entry.getKey())) {
+ ValueChunkWriter valueChunkWriter = entry.getValue();
+ TSDataType dataType = valueChunkWriter.getDataType();
+ switch (dataType) {
+ case BOOLEAN:
+ for (int row = 0; row < tablet.rowSize; row++) {
+ valueChunkWriter.write(-1, false, true);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case INT32:
+ for (int row = 0; row < tablet.rowSize; row++) {
+ valueChunkWriter.write(-1, 0, true);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case INT64:
+ for (int row = 0; row < tablet.rowSize; row++) {
+ valueChunkWriter.write(-1, 0L, true);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case FLOAT:
+ for (int row = 0; row < tablet.rowSize; row++) {
+ valueChunkWriter.write(-1, 0.0f, true);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case DOUBLE:
+ for (int row = 0; row < tablet.rowSize; row++) {
+ valueChunkWriter.write(-1, 0.0, true);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ case TEXT:
+ for (int row = 0; row < tablet.rowSize; row++) {
+ valueChunkWriter.write(-1, null, true);
+ if (valueChunkWriter.needANewPage()) {
+ valueChunkWriter.writePageToPageBuffer();
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ }
+ }
+ }
+
+ return tablet.rowSize;
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index b96f1a09ec..bd37840d79 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -136,6 +136,10 @@ public class TimeChunkWriter {
return false;
}
+ public boolean needANewPage() {
+ return pageWriter.getPointNumber() >= maxNumberOfPointsInPage;
+ }
+
public void writePageToPageBuffer() {
try {
if (numOfPages == 0) { // record the firstPageStatistics
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 3ece00b403..7a143c9f95 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -252,6 +252,10 @@ public class ValueChunkWriter {
return false;
}
+ public boolean needANewPage() {
+ return pageWriter.getSize() >= maxNumberOfPointsInPage;
+ }
+
public void sealCurrentPage() {
// if the page contains no points, we still need to serialize it
if (pageWriter != null && pageWriter.getSize() != 0) {