You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/12 13:01:19 UTC
[iotdb] 03/03: bind expr with input column index in another way
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 14491af4dfe18a4118f184cbef41f198b757f103
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 21:00:36 2022 +0800
bind expr with input column index in another way
---
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 37 ++++---------
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 3 +-
.../iotdb/db/query/expression/Expression.java | 15 ++----
.../query/expression/binary/BinaryExpression.java | 14 +++--
.../db/query/expression/unary/ConstantOperand.java | 8 ++-
.../query/expression/unary/FunctionExpression.java | 26 ++++++----
.../query/expression/unary/LogicNotExpression.java | 17 ++++--
.../query/expression/unary/NegationExpression.java | 11 +++-
.../query/expression/unary/TimeSeriesOperand.java | 10 +++-
.../db/query/udf/core/executor/UDTFContext.java | 60 ++++++++++++++++++++++
.../iotdb/db/query/udf/core/layer/DAGBuilder.java | 9 +++-
.../query/udf/core/layer/RawQueryInputLayer.java | 4 ++
12 files changed, 156 insertions(+), 58 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 976bfec456..e1c9dbadb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -25,9 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import org.apache.iotdb.db.query.expression.ResultColumn;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
-import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
-import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -43,15 +41,14 @@ import java.util.Set;
public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
- protected final ZoneId zoneId;
+ protected final UDTFContext udtfContext;
- protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
- protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
- protected Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
+ protected final Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
+ protected final Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
public UDTFPlan(ZoneId zoneId) {
super();
- this.zoneId = zoneId;
+ udtfContext = new UDTFContext(zoneId);
setOperatorType(Operator.OperatorType.UDTF);
}
@@ -128,35 +125,23 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
@Override
public void constructUdfExecutors(List<ResultColumn> resultColumns) {
- for (ResultColumn resultColumn : resultColumns) {
- resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId);
- }
+ udtfContext.constructUdfExecutors(resultColumns);
}
@Override
public void finalizeUDFExecutors(long queryId) {
- try {
- for (UDTFExecutor executor : expressionName2Executor.values()) {
- executor.beforeDestroy();
- }
- } finally {
- UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId);
- }
+ udtfContext.finalizeUDFExecutors(queryId);
}
public ResultColumn getResultColumnByDatasetOutputIndex(int datasetOutputIndex) {
return resultColumns.get(datasetOutputIndexToResultColumnIndex.get(datasetOutputIndex));
}
- public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
- return expressionName2Executor.get(functionExpression.getExpressionString());
- }
-
- public int getReaderIndex(String pathName) {
- return pathNameToReaderIndex.get(pathName);
+ public Integer getReaderIndexByExpressionName(String expressionName) {
+ return pathNameToReaderIndex.get(expressionName);
}
- public int getReaderIndexByExpressionName(String expressionName) {
- return pathNameToReaderIndex.get(expressionName);
+ public UDTFContext getUdtfContext() {
+ return udtfContext;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 1fe27e7c14..c299a355f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -102,7 +102,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
initDataSetFields();
}
- protected UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet)
+ public UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet)
throws QueryProcessException, IOException {
queryId = queryContext.getQueryId();
this.udtfPlan = udtfPlan;
@@ -123,6 +123,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
udtfPlan,
rawQueryInputLayer,
UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+ .bindInputLayerColumnIndexWithExpression()
.buildLayerMemoryAssigner()
.buildResultColumnPointReaders()
.setDataSetResultColumnDataTypes()
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 619167bf96..48f231c8fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.unary.ConstantOperand;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -49,7 +50,7 @@ public abstract class Expression {
protected Boolean isConstantOperandCache = null;
- protected Integer tsBlockInputColumnIndex = null;
+ protected Integer inputColumnIndex = null;
public boolean isBuiltInAggregationFunctionExpression() {
return false;
@@ -63,14 +64,6 @@ public abstract class Expression {
return false;
}
- public Integer getTsBlockInputColumnIndex() {
- return tsBlockInputColumnIndex;
- }
-
- public void setTsBlockInputColumnIndex(Integer tsBlockInputColumnIndex) {
- this.tsBlockInputColumnIndex = tsBlockInputColumnIndex;
- }
-
public abstract void concat(
List<PartialPath> prefixPaths,
List<Expression> resultExpressions,
@@ -94,11 +87,13 @@ public abstract class Expression {
public abstract void constructUdfExecutors(
Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
+ public abstract void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan);
+
public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
public abstract IntermediateLayer constructIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index b706a52f5d..18eca2151b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -205,6 +206,13 @@ public abstract class BinaryExpression extends Expression {
rightExpression.constructUdfExecutors(expressionName2Executor, zoneId);
}
+ @Override
+ public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+ leftExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+ rightExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+ inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+ }
+
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
leftExpression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -215,7 +223,7 @@ public abstract class BinaryExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
@@ -227,7 +235,7 @@ public abstract class BinaryExpression extends Expression {
IntermediateLayer leftParentIntermediateLayer =
leftExpression.constructIntermediateLayer(
queryId,
- udtfPlan,
+ udtfContext,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
@@ -235,7 +243,7 @@ public abstract class BinaryExpression extends Expression {
IntermediateLayer rightParentIntermediateLayer =
rightExpression.constructIntermediateLayer(
queryId,
- udtfPlan,
+ udtfContext,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0d32cfbad5..e8e0088599 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.ConstantIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
@@ -102,6 +103,11 @@ public class ConstantOperand extends Expression {
// Do nothing
}
+ @Override
+ public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+ // Do nothing
+ }
+
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
// Do nothing
@@ -110,7 +116,7 @@ public class ConstantOperand extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 564ce5bb09..3683ff50fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -246,6 +247,14 @@ public class FunctionExpression extends Expression {
expressionName2Executor.put(expressionString, new UDTFExecutor(this, zoneId));
}
+ @Override
+ public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+ for (Expression expression : expressions) {
+ expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+ }
+ inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+ }
+
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
for (Expression expression : expressions) {
@@ -257,7 +266,7 @@ public class FunctionExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
@@ -269,13 +278,12 @@ public class FunctionExpression extends Expression {
if (isBuiltInAggregationFunctionExpression) {
transformer =
new TransparentTransformer(
- rawTimeSeriesInputLayer.constructPointReader(
- udtfPlan.getReaderIndexByExpressionName(toString())));
+ rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex));
} else {
IntermediateLayer udfInputIntermediateLayer =
constructUdfInputIntermediateLayer(
queryId,
- udtfPlan,
+ udtfContext,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
@@ -283,7 +291,7 @@ public class FunctionExpression extends Expression {
transformer =
constructUdfTransformer(
queryId,
- udtfPlan,
+ udtfContext,
expressionDataTypeMap,
memoryAssigner,
udfInputIntermediateLayer);
@@ -303,7 +311,7 @@ public class FunctionExpression extends Expression {
private IntermediateLayer constructUdfInputIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
@@ -314,7 +322,7 @@ public class FunctionExpression extends Expression {
intermediateLayers.add(
expression.constructIntermediateLayer(
queryId,
- udtfPlan,
+ udtfContext,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
@@ -333,12 +341,12 @@ public class FunctionExpression extends Expression {
private UDFQueryTransformer constructUdfTransformer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
Map<Expression, TSDataType> expressionDataTypeMap,
LayerMemoryAssigner memoryAssigner,
IntermediateLayer udfInputIntermediateLayer)
throws QueryProcessException, IOException {
- UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
+ UDTFExecutor executor = udtfContext.getExecutorByFunctionExpression(this);
executor.beforeStart(queryId, memoryAssigner.assign(), expressionDataTypeMap);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index 66893a0172..33fb9cb5a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -39,7 +40,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class LogicNotExpression extends Expression {
protected Expression expression;
@@ -127,6 +132,12 @@ public class LogicNotExpression extends Expression {
expression.constructUdfExecutors(expressionName2Executor, zoneId);
}
+ @Override
+ public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+ expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+ inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+ }
+
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -136,7 +147,7 @@ public class LogicNotExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
@@ -148,7 +159,7 @@ public class LogicNotExpression extends Expression {
IntermediateLayer parentLayerPointReader =
expression.constructIntermediateLayer(
queryId,
- udtfPlan,
+ udtfContext,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 04065daa55..5b8dccf1ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -132,6 +133,12 @@ public class NegationExpression extends Expression {
expression.constructUdfExecutors(expressionName2Executor, zoneId);
}
+ @Override
+ public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+ expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+ inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+ }
+
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -141,7 +148,7 @@ public class NegationExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
@@ -153,7 +160,7 @@ public class NegationExpression extends Expression {
IntermediateLayer parentLayerPointReader =
expression.constructIntermediateLayer(
queryId,
- udtfPlan,
+ udtfContext,
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a9c1052c01..adcb7dff6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
@@ -116,6 +117,11 @@ public class TimeSeriesOperand extends Expression {
// nothing to do
}
+ @Override
+ public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+ inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
+ }
+
@Override
public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
memoryAssigner.increaseExpressionReference(this);
@@ -124,7 +130,7 @@ public class TimeSeriesOperand extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
long queryId,
- UDTFPlan udtfPlan,
+ UDTFContext udtfContext,
RawQueryInputLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
Map<Expression, TSDataType> expressionDataTypeMap,
@@ -134,7 +140,7 @@ public class TimeSeriesOperand extends Expression {
float memoryBudgetInMB = memoryAssigner.assign();
LayerPointReader parentLayerPointReader =
- rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath()));
+ rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex);
expressionDataTypeMap.put(this, parentLayerPointReader.getDataType());
expressionIntermediateLayerMap.put(
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
new file mode 100644
index 0000000000..cb7d467403
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.executor;
+
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class UDTFContext {
+
+ protected final ZoneId zoneId;
+
+ protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
+
+ public UDTFContext(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
+
+ public void constructUdfExecutors(List<ResultColumn> resultColumns) {
+ for (ResultColumn resultColumn : resultColumns) {
+ resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId);
+ }
+ }
+
+ public void finalizeUDFExecutors(long queryId) {
+ try {
+ for (UDTFExecutor executor : expressionName2Executor.values()) {
+ executor.beforeDestroy();
+ }
+ } finally {
+ UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId);
+ }
+ }
+
+ public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
+ return expressionName2Executor.get(functionExpression.getExpressionString());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 184753489d..270482b1e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -69,6 +69,13 @@ public class DAGBuilder {
expressionDataTypeMap = new HashMap<>();
}
+ public DAGBuilder bindInputLayerColumnIndexWithExpression() {
+ for (Expression expression : resultColumnExpressions) {
+ expression.bindInputLayerColumnIndexWithExpression(udtfPlan);
+ }
+ return this;
+ }
+
public DAGBuilder buildLayerMemoryAssigner() {
for (Expression expression : resultColumnExpressions) {
expression.updateStatisticsForMemoryAssigner(memoryAssigner);
@@ -83,7 +90,7 @@ public class DAGBuilder {
resultColumnExpressions[i]
.constructIntermediateLayer(
queryId,
- udtfPlan,
+ udtfPlan.getUdtfContext(),
rawTimeSeriesInputLayer,
expressionIntermediateLayerMap,
expressionDataTypeMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index d462da438c..fec8fd71b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -94,6 +94,10 @@ public class RawQueryInputLayer {
rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
}
+ public int getInputColumnCount() {
+ return dataTypes.length;
+ }
+
public LayerPointReader constructPointReader(int columnIndex) {
return new InputLayerPointReader(columnIndex);
}