You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/08 10:14:34 UTC
[iotdb] 02/02: make layer connections
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1c6637dd523b4fcfff6aa4853033ea00ae0eb6c3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 8 18:13:49 2021 +0800
make layer connections
---
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 23 +++--
.../iotdb/db/query/expression/Expression.java | 12 ++-
.../query/expression/binary/BinaryExpression.java | 69 ++++++++++---
.../query/expression/unary/FunctionExpression.java | 115 ++++++++++++++++++---
.../query/expression/unary/NegationExpression.java | 55 +++++++---
.../query/expression/unary/TimeSeriesOperand.java | 40 +++++--
.../iotdb/db/query/udf/core/layer/DAGBuilder.java | 26 ++++-
.../query/udf/core/layer/LayerMemoryAssigner.java | 67 ++++++++++++
8 files changed, 340 insertions(+), 67 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index aa20652..792802f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -51,7 +51,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
protected final ZoneId zoneId;
- protected Map<String, UDTFExecutor> columnName2Executor = new HashMap<>();
+ protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>();
protected Map<Integer, UDTFExecutor> originalOutputColumnIndex2Executor = new HashMap<>();
protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>();
@@ -114,21 +114,18 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
public void constructUdfExecutors(List<ResultColumn> resultColumns) {
for (int i = 0; i < resultColumns.size(); ++i) {
Expression expression = resultColumns.get(i).getExpression();
- if (!(expression instanceof FunctionExpression)) {
- continue;
+ expression.constructUdfExecutors(expressionName2Executor, zoneId);
+ if (expression instanceof FunctionExpression) {
+ originalOutputColumnIndex2Executor.put(
+ i, expressionName2Executor.get(expression.toString()));
}
-
- String columnName = expression.toString();
- columnName2Executor.computeIfAbsent(
- columnName, k -> new UDTFExecutor((FunctionExpression) expression, zoneId));
- originalOutputColumnIndex2Executor.put(i, columnName2Executor.get(columnName));
}
}
@Override
public void initializeUdfExecutors(long queryId, float collectorMemoryBudgetInMB)
throws QueryProcessException {
- Collection<UDTFExecutor> executors = columnName2Executor.values();
+ Collection<UDTFExecutor> executors = expressionName2Executor.values();
collectorMemoryBudgetInMB /= executors.size();
UDFRegistrationService.getInstance().acquireRegistrationLock();
@@ -146,7 +143,7 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
@Override
public void finalizeUDFExecutors(long queryId) {
try {
- for (UDTFExecutor executor : columnName2Executor.values()) {
+ for (UDTFExecutor executor : expressionName2Executor.values()) {
executor.beforeDestroy();
}
} finally {
@@ -184,10 +181,14 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
}
public UDTFExecutor getExecutorByDataSetOutputColumnIndex(int datasetOutputIndex) {
- return columnName2Executor.get(
+ return expressionName2Executor.get(
getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName());
}
+ public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) {
+ return expressionName2Executor.get(functionExpression.getExpressionString());
+ }
+
public String getRawQueryColumnNameByDatasetOutputColumnIndex(int datasetOutputIndex) {
return getResultColumnByDatasetOutputIndex(datasetOutputIndex).getResultColumnName();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 10b4117..3d4ce07 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,10 +24,13 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import java.io.IOException;
+import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -52,10 +55,17 @@ public abstract class Expression {
public abstract void collectPaths(Set<PartialPath> pathSet);
+ public abstract void constructUdfExecutors(
+ Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
+
+ public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
+
public abstract IntermediateLayer constructIntermediateLayer(
+ int queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
- Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException;
public String getExpressionString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 0ae46e0..383a9a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -25,11 +25,18 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import java.io.IOException;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -110,27 +117,55 @@ public abstract class BinaryExpression extends Expression {
}
@Override
+ public void constructUdfExecutors(
+ Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+ leftExpression.constructUdfExecutors(expressionName2Executor, zoneId);
+ rightExpression.constructUdfExecutors(expressionName2Executor, zoneId);
+ }
+
+ @Override
+ public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+ memoryAssigner.increaseExpressionReference(leftExpression);
+ memoryAssigner.increaseExpressionReference(rightExpression);
+ }
+
+ @Override
public IntermediateLayer constructIntermediateLayer(
+ int queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
- Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
- throws QueryProcessException {
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ LayerMemoryAssigner memoryAssigner)
+ throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
- // IntermediateLayer leftParentIntermediateLayer =
- // leftExpression.constructIntermediateLayer(
- // udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
- // IntermediateLayer rightParentIntermediateLayer =
- // rightExpression.constructIntermediateLayer(
- // udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
- //
- // expressionIntermediateLayerMap.put(
- // this,
- // new SingleInputColumnMultiReferenceIntermediateLayer(
- // constructTransformer(
- // leftParentIntermediateLayer.constructPointReader(),
- // rightParentIntermediateLayer.constructPointReader()),
- // -1,
- // -1));
+ float memoryBudgetInMB = memoryAssigner.assign();
+
+ IntermediateLayer leftParentIntermediateLayer =
+ leftExpression.constructIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ memoryAssigner);
+ IntermediateLayer rightParentIntermediateLayer =
+ rightExpression.constructIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ memoryAssigner);
+ Transformer transformer =
+ constructTransformer(
+ leftParentIntermediateLayer.constructPointReader(),
+ rightParentIntermediateLayer.constructPointReader());
+
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, transformer)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, transformer));
}
return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index b1d0efb..c40a619 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -27,10 +27,21 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.MultiInputColumnIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryTransformer;
import java.io.IOException;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -38,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.stream.Collectors;
public class FunctionExpression extends Expression {
@@ -148,28 +160,107 @@ public class FunctionExpression extends Expression {
}
@Override
+ public void constructUdfExecutors(
+ Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+ String expressionString = getExpressionString();
+ if (expressionName2Executor.containsKey(expressionString)) {
+ return;
+ }
+
+ for (Expression expression : expressions) {
+ expression.constructUdfExecutors(expressionName2Executor, zoneId);
+ }
+ expressionName2Executor.put(expressionString, new UDTFExecutor(this, zoneId));
+ }
+
+ @Override
+ public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+ for (Expression expression : expressions) {
+ memoryAssigner.increaseExpressionReference(expression);
+ }
+ }
+
+ @Override
public IntermediateLayer constructIntermediateLayer(
+ int queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
- Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ LayerMemoryAssigner memoryAssigner)
throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
- // List<LayerPointReader> parentLayerPointReaders = new ArrayList<>();
- // for (Expression expression : expressions) {
- // parentLayerPointReaders.add(
- // expression
- // .constructIntermediateLayer(
- // udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
- // .constructPointReader());
- // }
- //
- // expressionIntermediateLayerMap.put(
- // this, new MultiInputColumnIntermediateLayer(-1, -1, parentLayerPointReaders));
+ float memoryBudgetInMB = memoryAssigner.assign();
+
+ IntermediateLayer udfInputIntermediateLayer =
+ constructUdfInputIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ memoryAssigner);
+ Transformer transformer =
+ constructUdfTransformer(udtfPlan, memoryAssigner, udfInputIntermediateLayer);
+
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, transformer)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, transformer));
}
return expressionIntermediateLayerMap.get(this);
}
+ private IntermediateLayer constructUdfInputIntermediateLayer(
+ int queryId,
+ UDTFPlan udtfPlan,
+ UDFLayer rawTimeSeriesInputLayer,
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ LayerMemoryAssigner memoryAssigner)
+ throws QueryProcessException, IOException {
+ List<IntermediateLayer> intermediateLayers = new ArrayList<>();
+ for (Expression expression : expressions) {
+ intermediateLayers.add(
+ expression.constructIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ memoryAssigner));
+ }
+ return intermediateLayers.size() == 1
+ ? intermediateLayers.get(0)
+ : new MultiInputColumnIntermediateLayer(
+ queryId,
+ memoryAssigner.assign(),
+ intermediateLayers.stream()
+ .map(IntermediateLayer::constructPointReader)
+ .collect(Collectors.toList()));
+ }
+
+ private UDFQueryTransformer constructUdfTransformer(
+ UDTFPlan udtfPlan,
+ LayerMemoryAssigner memoryAssigner,
+ IntermediateLayer udfInputIntermediateLayer)
+ throws QueryProcessException, IOException {
+ UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this);
+ AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
+ switch (accessStrategy.getAccessStrategyType()) {
+ case ROW_BY_ROW:
+ return new UDFQueryRowTransformer(udfInputIntermediateLayer.constructRowReader(), executor);
+ case SLIDING_SIZE_WINDOW:
+ case SLIDING_TIME_WINDOW:
+ return new UDFQueryRowWindowTransformer(
+ udfInputIntermediateLayer.constructRowWindowReader(
+ accessStrategy, memoryAssigner.assign()),
+ executor);
+ default:
+ throw new UnsupportedOperationException("Unsupported transformer access strategy");
+ }
+ }
+
public List<PartialPath> getPaths() {
if (paths == null) {
paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 8a13b27..eade5be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -25,9 +25,17 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
+import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import java.io.IOException;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -75,23 +83,44 @@ public class NegationExpression extends Expression {
}
@Override
+ public void constructUdfExecutors(
+ Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+ expression.constructUdfExecutors(expressionName2Executor, zoneId);
+ }
+
+ @Override
+ public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+ memoryAssigner.increaseExpressionReference(this);
+ }
+
+ @Override
public IntermediateLayer constructIntermediateLayer(
+ int queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
- Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
- throws QueryProcessException {
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ LayerMemoryAssigner memoryAssigner)
+ throws QueryProcessException, IOException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
- // IntermediateLayer parentIntermediateLayer =
- // expression.constructIntermediateLayer(
- // udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
- //
- // expressionIntermediateLayerMap.put(
- // this,
- // new SingleInputColumnMultiReferenceIntermediateLayer(
- // new
- // ArithmeticNegationTransformer(parentIntermediateLayer.constructPointReader()),
- // -1,
- // -1));
+ float memoryBudgetInMB = memoryAssigner.assign();
+
+ IntermediateLayer parentLayerPointReader =
+ expression.constructIntermediateLayer(
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ memoryAssigner);
+ Transformer transformer =
+ new ArithmeticNegationTransformer(parentLayerPointReader.constructPointReader());
+
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, transformer)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, transformer));
}
return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a8db711..6ad0ac3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -25,10 +25,16 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnMultiReferenceIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReferenceIntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -71,19 +77,37 @@ public class TimeSeriesOperand extends Expression {
}
@Override
+ public void constructUdfExecutors(
+ Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
+ // nothing to do
+ }
+
+ @Override
+ public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) {
+ memoryAssigner.increaseExpressionReference(this);
+ }
+
+ @Override
public IntermediateLayer constructIntermediateLayer(
+ int queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
- Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
+ Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+ LayerMemoryAssigner memoryAssigner)
throws QueryProcessException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
- // expressionIntermediateLayerMap.put(
- // this,
- // new SingleInputColumnMultiReferenceIntermediateLayer(
- // rawTimeSeriesInputLayer.constructPointReader(
- // udtfPlan.getReaderIndex(path.getFullPath())),
- // -1,
- // -1));
+ float memoryBudgetInMB = memoryAssigner.assign();
+
+ LayerPointReader parentLayerPointReader =
+ rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath()));
+
+ expressionIntermediateLayerMap.put(
+ this,
+ memoryAssigner.getReference(this) == 1
+ ? new SingleInputColumnSingleReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, parentLayerPointReader)
+ : new SingleInputColumnMultiReferenceIntermediateLayer(
+ queryId, memoryBudgetInMB, parentLayerPointReader));
}
return expressionIntermediateLayerMap.get(this);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 5f398b4..3e021e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -33,6 +33,7 @@ import java.util.Map;
public class DAGBuilder {
+ private final int queryId;
private final UDTFPlan udtfPlan;
private final UDFLayer rawTimeSeriesInputLayer;
@@ -41,14 +42,16 @@ public class DAGBuilder {
// output
private final LayerPointReader[] resultColumnPointReaders;
+ private final LayerMemoryAssigner memoryAssigner;
+
// all result column expressions will be split into several sub-expressions, each expression has
// its own result point reader. different result column expressions may have the same
// sub-expressions, but they can share the same point reader. we cache the point reader here to
// make sure that only one point reader will be built for one expression.
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
- public DAGBuilder(UDTFPlan udtfPlan, UDFLayer inputLayer)
- throws QueryProcessException, IOException {
+ public DAGBuilder(int queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
+ this.queryId = queryId;
this.udtfPlan = udtfPlan;
this.rawTimeSeriesInputLayer = inputLayer;
@@ -58,18 +61,31 @@ public class DAGBuilder {
}
resultColumnPointReaders = new LayerPointReader[resultColumnExpressions.size()];
+ memoryAssigner = new LayerMemoryAssigner(memoryBudgetInMB);
+
expressionIntermediateLayerMap = new HashMap<>();
+ }
- build();
+ public DAGBuilder buildLayerMemoryAssigner() {
+ for (Expression expression : resultColumnExpressions) {
+ memoryAssigner.increaseExpressionReference(expression);
+ expression.updateStatisticsForMemoryAssigner(memoryAssigner);
+ }
+ memoryAssigner.build();
+ return this;
}
- public DAGBuilder build() throws QueryProcessException, IOException {
+ public DAGBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
for (int i = 0; i < resultColumnExpressions.size(); ++i) {
resultColumnPointReaders[i] =
resultColumnExpressions
.get(i)
.constructIntermediateLayer(
- udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
+ queryId,
+ udtfPlan,
+ rawTimeSeriesInputLayer,
+ expressionIntermediateLayerMap,
+ memoryAssigner)
.constructPointReader();
}
return this;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerMemoryAssigner.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerMemoryAssigner.java
new file mode 100644
index 0000000..afa2298
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerMemoryAssigner.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class LayerMemoryAssigner {
+
+ private final float memoryBudgetInMB;
+
+ private final Map<Expression, Integer> expressionReferenceCount;
+
+ private float memoryBudgetForSingleReference;
+
+ public LayerMemoryAssigner(float memoryBudgetInMB) {
+ expressionReferenceCount = new HashMap<>();
+ this.memoryBudgetInMB = memoryBudgetInMB;
+ }
+
+ public void increaseExpressionReference(Expression expression) {
+ if (!expressionReferenceCount.containsKey(expression)) {
+ expressionReferenceCount.put(expression, 1);
+ }
+ expressionReferenceCount.put(expression, 1 + expressionReferenceCount.get(expression));
+ }
+
+ public void build() {
+ int memoryPartitions = 0;
+ for (Entry<Expression, Integer> expressionReferenceEntry :
+ expressionReferenceCount.entrySet()) {
+ memoryPartitions +=
+ expressionReferenceEntry.getValue()
+ * (expressionReferenceEntry.getKey() instanceof FunctionExpression ? 2 : 1);
+ }
+ memoryBudgetForSingleReference = memoryBudgetInMB / memoryPartitions;
+ }
+
+ public int getReference(Expression expression) {
+ return expressionReferenceCount.get(expression);
+ }
+
+ public float assign() {
+ return memoryBudgetForSingleReference;
+ }
+}