You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/31 01:07:27 UTC
[iotdb] branch master updated: [IOTDB-3229] Field is 0 rather than null when query two series in one device (#6071)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 88adfbea69 [IOTDB-3229] Field is 0 rather than null when query two series in one device (#6071)
88adfbea69 is described below
commit 88adfbea697931b1d03cd83394491fe77cf4f960
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 31 09:07:22 2022 +0800
[IOTDB-3229] Field is 0 rather than null when query two series in one device (#6071)
---
.../execution/operator/process/FilterOperator.java | 80 ++++++++++------------
.../operator/process/TransformOperator.java | 70 +++++++++++--------
.../iotdb/tsfile/read/common/block/TsBlock.java | 3 +-
3 files changed, 79 insertions(+), 74 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index b9050c27ad..cf04e891ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -33,6 +33,9 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -41,6 +44,8 @@ import java.util.Map;
public class FilterOperator extends TransformOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FilterOperator.class);
+
private LayerPointReader filterPointReader;
public FilterOperator(
@@ -90,18 +95,6 @@ public class FilterOperator extends TransformOperator {
}
}
- @Override
- protected void readyForFirstIteration() throws QueryProcessException, IOException {
- iterateFilterReaderToNextValid();
- }
-
- private void iterateFilterReaderToNextValid() throws QueryProcessException, IOException {
- do {
- filterPointReader.readyForNext();
- } while (filterPointReader.next()
- && (filterPointReader.isCurrentNull() || !filterPointReader.currentBoolean()));
- }
-
@Override
public TsBlock next() {
final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
@@ -121,56 +114,57 @@ public class FilterOperator extends TransformOperator {
try {
int rowCount = 0;
- while (rowCount < FETCH_SIZE && filterPointReader.next()) {
- final long currentTime = filterPointReader.currentTime();
- boolean hasAtLeastOneValid = false;
- for (int i = 0; i < outputColumnCount; ++i) {
- if (currentTime == iterateValueReadersToNextValid(transformers[i], currentTime)) {
- hasAtLeastOneValid = true;
+ while (rowCount < FETCH_SIZE && !timeHeap.isEmpty()) {
+ final long currentTime = timeHeap.pollFirst();
+
+ if (filterPointReader.next() && filterPointReader.currentTime() == currentTime) {
+ if (!filterPointReader.isCurrentNull() && filterPointReader.currentBoolean()) {
+ // time
+ timeBuilder.writeLong(currentTime);
+
+ // values
+ for (int i = 0; i < outputColumnCount; ++i) {
+ collectDataPointAndIterateToNextValid(
+ transformers[i], columnBuilders[i], currentTime);
+ }
+
+ ++rowCount;
+ } else {
+ // values
+ for (int i = 0; i < outputColumnCount; ++i) {
+ skipDataPointAndIterateToNextValid(transformers[i], currentTime);
+ }
}
- }
- if (hasAtLeastOneValid) {
- timeBuilder.writeLong(currentTime);
+ filterPointReader.readyForNext();
+ iterateReaderToNextValid(filterPointReader);
+ } else {
+ // values
for (int i = 0; i < outputColumnCount; ++i) {
- collectDataPoint(transformers[i], columnBuilders[i], currentTime);
+ skipDataPointAndIterateToNextValid(transformers[i], currentTime);
}
- ++rowCount;
}
- iterateFilterReaderToNextValid();
-
inputLayer.updateRowRecordListEvictionUpperBound();
}
tsBlockBuilder.declarePositions(rowCount);
} catch (Exception e) {
+ LOGGER.error("FilterOperator#next()", e);
throw new RuntimeException(e);
}
return tsBlockBuilder.build();
}
- private long iterateValueReadersToNextValid(LayerPointReader reader, long currentTime)
- throws QueryProcessException, IOException {
- while (reader.next() && (reader.isCurrentNull() || reader.currentTime() < currentTime)) {
- reader.readyForNext();
+ private void skipDataPointAndIterateToNextValid(LayerPointReader reader, long currentTime)
+ throws IOException, QueryProcessException {
+ if (!reader.next() || reader.currentTime() != currentTime) {
+ return;
}
- return reader.currentTime();
- }
- @Override
- public boolean hasNext() {
- try {
- if (isFirstIteration) {
- readyForFirstIteration();
- isFirstIteration = false;
- }
-
- return filterPointReader.next();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ reader.readyForNext();
+ iterateReaderToNextValid(reader);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index c66d3d3c98..b36c5cd308 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.ZoneId;
@@ -51,6 +53,8 @@ import java.util.Map;
public class TransformOperator implements ProcessOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TransformOperator.class);
+
// TODO: make it configurable
protected static final int FETCH_SIZE = 10000;
@@ -142,7 +146,7 @@ public class TransformOperator implements ProcessOperator {
}
}
- private void iterateReaderToNextValid(LayerPointReader reader)
+ protected void iterateReaderToNextValid(LayerPointReader reader)
throws QueryProcessException, IOException {
// Since a constant operand is not allowed to be a result column, the reader will not be
// a ConstantLayerPointReader.
@@ -158,11 +162,12 @@ public class TransformOperator implements ProcessOperator {
}
@Override
- public boolean hasNext() {
+ public final boolean hasNext() {
if (isFirstIteration) {
try {
readyForFirstIteration();
} catch (Exception e) {
+ LOGGER.error("TransformOperator#hasNext()", e);
throw new RuntimeException(e);
}
isFirstIteration = false;
@@ -197,9 +202,7 @@ public class TransformOperator implements ProcessOperator {
// values
for (int i = 0; i < columnCount; ++i) {
- LayerPointReader reader = transformers[i];
- collectDataPoint(reader, columnBuilders[i], currentTime);
- iterateReaderToNextValid(reader);
+ collectDataPointAndIterateToNextValid(transformers[i], columnBuilders[i], currentTime);
}
++rowCount;
@@ -209,45 +212,52 @@ public class TransformOperator implements ProcessOperator {
tsBlockBuilder.declarePositions(rowCount);
} catch (Exception e) {
+ LOGGER.error("TransformOperator#next()", e);
throw new RuntimeException(e);
}
return tsBlockBuilder.build();
}
- protected void collectDataPoint(LayerPointReader reader, ColumnBuilder writer, long currentTime)
+ protected void collectDataPointAndIterateToNextValid(
+ LayerPointReader reader, ColumnBuilder writer, long currentTime)
throws QueryProcessException, IOException {
- if (!reader.next() || reader.currentTime() != currentTime || reader.isCurrentNull()) {
+ if (!reader.next() || reader.currentTime() != currentTime) {
writer.appendNull();
return;
}
- TSDataType type = reader.getDataType();
- switch (type) {
- case INT32:
- writer.writeInt(reader.currentInt());
- break;
- case INT64:
- writer.writeLong(reader.currentLong());
- break;
- case FLOAT:
- writer.writeFloat(reader.currentFloat());
- break;
- case DOUBLE:
- writer.writeDouble(reader.currentDouble());
- break;
- case BOOLEAN:
- writer.writeBoolean(reader.currentBoolean());
- break;
- case TEXT:
- writer.writeBinary(reader.currentBinary());
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
+ if (reader.isCurrentNull()) {
+ writer.appendNull();
+ } else {
+ TSDataType type = reader.getDataType();
+ switch (type) {
+ case INT32:
+ writer.writeInt(reader.currentInt());
+ break;
+ case INT64:
+ writer.writeLong(reader.currentLong());
+ break;
+ case FLOAT:
+ writer.writeFloat(reader.currentFloat());
+ break;
+ case DOUBLE:
+ writer.writeDouble(reader.currentDouble());
+ break;
+ case BOOLEAN:
+ writer.writeBoolean(reader.currentBoolean());
+ break;
+ case TEXT:
+ writer.writeBinary(reader.currentBinary());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
}
reader.readyForNext();
+ iterateReaderToNextValid(reader);
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 18e61c2ea3..7e4d836bcd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -347,7 +347,8 @@ public class TsBlock {
int columnCount = getValueColumnCount();
Object[] row = new Object[columnCount + 1];
for (int i = 0; i < columnCount; ++i) {
- row[i] = valueColumns[i].getObject(rowIndex);
+ final Column column = valueColumns[i];
+ row[i] = column.isNull(rowIndex) ? null : column.getObject(rowIndex);
}
row[columnCount] = timeColumn.getObject(rowIndex);