You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/18 12:36:23 UTC
[iotdb] branch master updated: [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation (#5569)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 38364cf79d [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation (#5569)
38364cf79d is described below
commit 38364cf79d0e50122e1fbcf02e9294898c27e6c2
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Apr 18 20:36:18 2022 +0800
[IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation (#5569)
---
.../db/mpp/operator/process/TransformOperator.java | 152 ++++++++++++++++++---
.../tsfile/read/common/block/TsBlockBuilder.java | 15 +-
2 files changed, 145 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index e2c452874f..48d0c71185 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
@@ -30,44 +32,66 @@ import org.apache.iotdb.db.query.udf.core.layer.TsBlockInputDataSet;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.List;
public class TransformOperator implements ProcessOperator {
- protected static final float UDF_READER_MEMORY_BUDGET_IN_MB =
+ private static final int FETCH_SIZE = 10000;
+
+ private final float udfReaderMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
- protected static final float UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB =
+ private final float udfTransformerMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
- protected static final float UDF_COLLECTOR_MEMORY_BUDGET_IN_MB =
+ private final float udfCollectorMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
private final OperatorContext operatorContext;
-
+ private final Operator inputOperator;
+ private final List<TSDataType> inputDataTypes;
private final Expression[] outputExpressions;
private final UDTFContext udtfContext;
+ private final boolean keepNull;
+ private IUDFInputDataSet inputDataset;
private LayerPointReader[] transformers;
+ private TimeSelector timeHeap;
public TransformOperator(
OperatorContext operatorContext,
+ Operator inputOperator,
List<TSDataType> inputDataTypes,
Expression[] outputExpressions,
- UDTFContext udtfContext)
+ UDTFContext udtfContext,
+ boolean keepNull)
throws QueryProcessException, IOException {
this.operatorContext = operatorContext;
-
+ this.inputOperator = inputOperator;
+ this.inputDataTypes = inputDataTypes;
this.outputExpressions = outputExpressions;
this.udtfContext = udtfContext;
+ this.keepNull = keepNull;
- initTransformers(inputDataTypes);
+ initInputDataset(inputDataTypes);
+ initTransformers();
+ initTimeHeap();
}
- protected void initTransformers(List<TSDataType> inputDataTypes)
- throws QueryProcessException, IOException {
+ private void initInputDataset(List<TSDataType> inputDataTypes) {
+ inputDataset = new TsBlockInputDataSet(inputOperator, inputDataTypes);
+ }
+
+ private void initTransformers() throws QueryProcessException, IOException {
UDFRegistrationService.getInstance().acquireRegistrationLock();
// This statement must be surrounded by the registration lock.
UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
@@ -77,12 +101,10 @@ public class TransformOperator implements ProcessOperator {
new EvaluationDAGBuilder(
operatorContext.getOperatorId(),
new RawQueryInputLayer(
- operatorContext.getOperatorId(),
- UDF_READER_MEMORY_BUDGET_IN_MB,
- new TsBlockInputDataSet(this, inputDataTypes)),
+ operatorContext.getOperatorId(), udfReaderMemoryBudgetInMB, inputDataset),
outputExpressions,
udtfContext,
- UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+ udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
.buildLayerMemoryAssigner()
.buildResultColumnPointReaders()
.getOutputPointReaders();
@@ -91,28 +113,116 @@ public class TransformOperator implements ProcessOperator {
}
}
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
+ private void initTimeHeap() throws QueryProcessException, IOException {
+ timeHeap = new TimeSelector(transformers.length << 1, true);
+ for (LayerPointReader reader : transformers) {
+ iterateReaderToNextValid(reader);
+ }
}
- @Override
- public TsBlock next() {
- return null;
+ private void iterateReaderToNextValid(LayerPointReader reader)
+ throws QueryProcessException, IOException {
+ // Since a constant operand is not allowed to be a result column, the reader will not be
+ // a ConstantLayerPointReader.
+ // If keepNull is false, we must iterate the reader until a non-null row is returned.
+ while (reader.next()) {
+ if (reader.isCurrentNull() && !keepNull) {
+ reader.readyForNext();
+ continue;
+ }
+ timeHeap.add(reader.currentTime());
+ break;
+ }
}
@Override
public boolean hasNext() {
- return false;
+ return !timeHeap.isEmpty();
+ }
+
+ @Override
+ public TsBlock next() {
+ final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+ tsBlockBuilder.buildValueColumnBuilders(inputDataTypes);
+
+ final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ final int columnCount = columnBuilders.length;
+
+ int rowCount = 0;
+ try {
+ while (rowCount < FETCH_SIZE && !timeHeap.isEmpty()) {
+ long minTime = timeHeap.pollFirst();
+
+ timeBuilder.writeLong(minTime);
+
+ for (int i = 0; i < columnCount; ++i) {
+ LayerPointReader reader = transformers[i];
+
+ if (!reader.next() || reader.currentTime() != minTime || reader.isCurrentNull()) {
+ columnBuilders[i].appendNull();
+ continue;
+ }
+
+ TSDataType type = reader.getDataType();
+ switch (type) {
+ case INT32:
+ columnBuilders[i].writeInt(reader.currentInt());
+ break;
+ case INT64:
+ columnBuilders[i].writeLong(reader.currentLong());
+ break;
+ case FLOAT:
+ columnBuilders[i].writeFloat(reader.currentFloat());
+ break;
+ case DOUBLE:
+ columnBuilders[i].writeDouble(reader.currentDouble());
+ break;
+ case BOOLEAN:
+ columnBuilders[i].writeBoolean(reader.currentBoolean());
+ break;
+ case TEXT:
+ columnBuilders[i].writeBinary(reader.currentBinary());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+
+ reader.readyForNext();
+
+ iterateReaderToNextValid(reader);
+ }
+
+ ++rowCount;
+ }
+ } catch (Exception e) {
+ // TODO: throw here?
+ throw new RuntimeException(e);
+ }
+
+ return tsBlockBuilder.build();
}
@Override
public void close() throws Exception {
udtfContext.finalizeUDFExecutors(operatorContext.getOperatorId());
+
+ inputOperator.close();
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return inputOperator.isBlocked();
}
@Override
public boolean isFinished() throws IOException {
- return false;
+ return !hasNext();
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index 909533553f..8de927b785 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -19,7 +19,16 @@
package org.apache.iotdb.tsfile.read.common.block;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.column.*;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
@@ -223,6 +232,10 @@ public class TsBlockBuilder {
return valueColumnBuilders[channel];
}
+ public ColumnBuilder[] getValueColumnBuilders() {
+ return valueColumnBuilders;
+ }
+
public TSDataType getType(int channel) {
return types.get(channel);
}