You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/03 09:07:41 UTC
[iotdb] 02/03: refactor: inputLayer -> udfLayer
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6dfba2c72dd5d1899f989f88780bed03f1bf51f8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Sep 3 09:33:18 2021 +0800
refactor: inputLayer -> udfLayer
---
.../db/query/dataset/UDTFAlignByTimeDataSet.java | 4 +--
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 18 +++++------
.../db/query/dataset/UDTFNonAlignDataSet.java | 2 +-
.../iotdb/db/query/expression/Expression.java | 4 +--
.../query/expression/binary/BinaryExpression.java | 8 ++---
.../query/expression/unary/FunctionExpression.java | 7 +++--
.../query/expression/unary/NegationExpression.java | 6 ++--
.../query/expression/unary/TimeSeriesOperand.java | 7 +++--
.../iotdb/db/query/udf/core/layer/DAGBuilder.java | 35 ++++++++++++----------
.../core/layer/{InputLayer.java => UDFLayer.java} | 6 ++--
10 files changed, 52 insertions(+), 45 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
index 6c065c2..c7e8af4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFAlignByTimeDataSet.java
@@ -193,7 +193,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
// todo: control upper bound here
- inputLayer.updateRowRecordListEvictionUpperBound();
+ udfLayer.updateRowRecordListEvictionUpperBound();
}
/*
@@ -296,7 +296,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
}
// todo: control upper bound here
- inputLayer.updateRowRecordListEvictionUpperBound();
+ udfLayer.updateRowRecordListEvictionUpperBound();
return rowRecord;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 0f9ef3d..6ed497d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
@@ -67,7 +67,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
protected final long queryId;
protected final UDTFPlan udtfPlan;
- protected final InputLayer inputLayer;
+ protected final UDFLayer udfLayer;
protected LayerPointReader[] transformers;
@@ -84,8 +84,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
queryId = queryContext.getQueryId();
this.udtfPlan = udtfPlan;
- inputLayer =
- new InputLayer(
+ udfLayer =
+ new UDFLayer(
queryId,
UDF_READER_MEMORY_BUDGET_IN_MB,
deduplicatedPaths,
@@ -108,8 +108,8 @@ public abstract class UDTFDataSet extends QueryDataSet {
super(new ArrayList<>(deduplicatedPaths), deduplicatedDataTypes);
queryId = queryContext.getQueryId();
this.udtfPlan = udtfPlan;
- inputLayer =
- new InputLayer(
+ udfLayer =
+ new UDFLayer(
queryId,
UDF_READER_MEMORY_BUDGET_IN_MB,
deduplicatedPaths,
@@ -167,13 +167,13 @@ public abstract class UDTFDataSet extends QueryDataSet {
switch (accessStrategy.getAccessStrategyType()) {
case ROW_BY_ROW:
transformers[columnIndex] =
- new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
+ new UDFQueryRowTransformer(udfLayer.constructRowReader(readerIndexes), executor);
break;
case SLIDING_SIZE_WINDOW:
case SLIDING_TIME_WINDOW:
transformers[columnIndex] =
new UDFQueryRowWindowTransformer(
- inputLayer.constructRowWindowReader(
+ udfLayer.constructRowWindowReader(
readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
executor);
break;
@@ -254,7 +254,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
}
private LayerPointReader constructPointReaderBySeriesName(String seriesName) {
- return inputLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
+ return udfLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
}
public void finalizeUDFs(long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
index 25dd871..e3810bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFNonAlignDataSet.java
@@ -114,7 +114,7 @@ public class UDTFNonAlignDataSet extends UDTFDataSet implements DirectNonAlignDa
}
// todo: control upper bound here
- inputLayer.updateRowRecordListEvictionUpperBound();
+ udfLayer.updateRowRecordListEvictionUpperBound();
tsQueryNonAlignDataSet.setTimeList(timeBufferList);
tsQueryNonAlignDataSet.setValueList(valueBufferList);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 9dd75bd..09a15ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import java.util.List;
import java.util.Map;
@@ -53,7 +53,7 @@ public abstract class Expression {
public abstract IntermediateLayer constructIntermediateLayer(
UDTFPlan udtfPlan,
- InputLayer inputLayer,
+ UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
throws QueryProcessException;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index e31496c..b793ef9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticBinaryTransformer;
@@ -113,16 +113,16 @@ public abstract class BinaryExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
UDTFPlan udtfPlan,
- InputLayer inputLayer,
+ UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
throws QueryProcessException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
IntermediateLayer leftParentIntermediateLayer =
leftExpression.constructIntermediateLayer(
- udtfPlan, inputLayer, expressionIntermediateLayerMap);
+ udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
IntermediateLayer rightParentIntermediateLayer =
rightExpression.constructIntermediateLayer(
- udtfPlan, inputLayer, expressionIntermediateLayerMap);
+ udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
expressionIntermediateLayerMap.put(
this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 696e2d2..d484769 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -27,9 +27,9 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.MultiInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import java.util.ArrayList;
@@ -151,7 +151,7 @@ public class FunctionExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
UDTFPlan udtfPlan,
- InputLayer inputLayer,
+ UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
throws QueryProcessException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
@@ -159,7 +159,8 @@ public class FunctionExpression extends Expression {
for (Expression expression : expressions) {
parentLayerPointReaders.add(
expression
- .constructIntermediateLayer(udtfPlan, inputLayer, expressionIntermediateLayerMap)
+ .constructIntermediateLayer(
+ udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
.constructPointReader());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 8dcdc73..326d1af 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
import java.util.ArrayList;
@@ -79,13 +79,13 @@ public class NegationExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
UDTFPlan udtfPlan,
- InputLayer inputLayer,
+ UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
throws QueryProcessException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
IntermediateLayer parentIntermediateLayer =
expression.constructIntermediateLayer(
- udtfPlan, inputLayer, expressionIntermediateLayerMap);
+ udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap);
expressionIntermediateLayerMap.put(
this,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 9b17315..a67e621 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.utils.WildcardsRemover;
import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.layer.InputLayer;
import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer;
import org.apache.iotdb.db.query.udf.core.layer.SingleInputMultiOutputIntermediateLayer;
+import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.List;
@@ -74,14 +74,15 @@ public class TimeSeriesOperand extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
UDTFPlan udtfPlan,
- InputLayer inputLayer,
+ UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap)
throws QueryProcessException {
if (!expressionIntermediateLayerMap.containsKey(this)) {
expressionIntermediateLayerMap.put(
this,
new SingleInputMultiOutputIntermediateLayer(
- inputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())),
+ rawTimeSeriesInputLayer.constructPointReader(
+ udtfPlan.getReaderIndex(path.getFullPath())),
-1,
-1));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 472ca2d..6a107d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.ResultColumn;
-import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
import java.util.ArrayList;
import java.util.HashMap;
@@ -33,42 +33,47 @@ import java.util.Map;
public class DAGBuilder {
private final UDTFPlan udtfPlan;
- private final InputLayer inputLayer;
+ private final UDFLayer rawTimeSeriesInputLayer;
// input
private final List<Expression> resultColumnExpressions;
// output
- private final Transformer[] resultColumnTransformers;
+ private final LayerPointReader[] resultColumnPointReaders;
// all result column expressions will be split into several sub-expressions, each expression has
- // its own transformer. different result column expressions may have the same sub-expressions,
- // but they can share the same transformer. we cache the transformer here to make sure that only
- // one transformer will be built for one expression.
+ // its own result point reader. different result column expressions may have the same
+ // sub-expressions, but they can share the same point reader. we cache the point reader here to
+ // make sure that only one point reader will be built for one expression.
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
- public DAGBuilder(UDTFPlan udtfPlan, InputLayer inputLayer) throws QueryProcessException {
+ public DAGBuilder(UDTFPlan udtfPlan, UDFLayer inputLayer) throws QueryProcessException {
this.udtfPlan = udtfPlan;
- this.inputLayer = inputLayer;
+ this.rawTimeSeriesInputLayer = inputLayer;
resultColumnExpressions = new ArrayList<>();
for (ResultColumn resultColumn : udtfPlan.getResultColumns()) {
resultColumnExpressions.add(resultColumn.getExpression());
}
- resultColumnTransformers = new Transformer[resultColumnExpressions.size()];
+ resultColumnPointReaders = new LayerPointReader[resultColumnExpressions.size()];
expressionIntermediateLayerMap = new HashMap<>();
build();
}
- public void build() throws QueryProcessException {
- for (Expression resultColumnExpression : resultColumnExpressions) {
- resultColumnExpression.constructIntermediateLayer(
- udtfPlan, inputLayer, expressionIntermediateLayerMap);
+ public DAGBuilder build() throws QueryProcessException {
+ for (int i = 0; i < resultColumnExpressions.size(); ++i) {
+ resultColumnPointReaders[i] =
+ resultColumnExpressions
+ .get(i)
+ .constructIntermediateLayer(
+ udtfPlan, rawTimeSeriesInputLayer, expressionIntermediateLayerMap)
+ .constructPointReader();
}
+ return this;
}
- public Transformer[] getResultColumnTransformers() {
- return resultColumnTransformers;
+ public LayerPointReader[] getResultColumnPointReaders() {
+ return resultColumnPointReaders;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java
rename to server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
index f16a210..a613c4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/InputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
@@ -49,7 +49,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
import java.util.List;
-public class InputLayer {
+public class UDFLayer {
private long queryId;
@@ -61,7 +61,7 @@ public class InputLayer {
private SafetyLine safetyLine;
/** InputLayerWithoutValueFilter */
- public InputLayer(
+ public UDFLayer(
long queryId,
float memoryBudgetInMB,
List<PartialPath> paths,
@@ -75,7 +75,7 @@ public class InputLayer {
}
/** InputLayerWithValueFilter */
- public InputLayer(
+ public UDFLayer(
long queryId,
float memoryBudgetInMB,
List<PartialPath> paths,