You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/09 01:43:30 UTC
[iotdb] 01/02: make the new code work
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 86cf4b1955f60ccfa0aee792fac824b8d76f54d8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 8 18:22:06 2021 +0800
make the new code work
---
.../apache/iotdb/db/query/dataset/UDTFDataSet.java | 161 +--------------------
.../iotdb/db/query/expression/Expression.java | 2 +-
.../query/expression/binary/BinaryExpression.java | 2 +-
.../query/expression/unary/FunctionExpression.java | 4 +-
.../query/expression/unary/NegationExpression.java | 2 +-
.../query/expression/unary/TimeSeriesOperand.java | 2 +-
.../iotdb/db/query/udf/core/layer/DAGBuilder.java | 4 +-
7 files changed, 14 insertions(+), 163 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 6ed497d..3f69f62 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -24,30 +24,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.expression.binary.AdditionExpression;
-import org.apache.iotdb.db.query.expression.binary.BinaryExpression;
-import org.apache.iotdb.db.query.expression.binary.DivisionExpression;
-import org.apache.iotdb.db.query.expression.binary.ModuloExpression;
-import org.apache.iotdb.db.query.expression.binary.MultiplicationExpression;
-import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
-import org.apache.iotdb.db.query.expression.unary.NegationExpression;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
-import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
+import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticModuloTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticMultiplicationTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticSubtractionTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.RawQueryPointTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
-import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -120,141 +101,11 @@ public abstract class UDTFDataSet extends QueryDataSet {
}
protected void initTransformers() throws QueryProcessException, IOException {
- final float memoryBudgetForSingleWindowTransformer =
- calculateMemoryBudgetForSingleWindowTransformer();
- final int size = udtfPlan.getPathToIndex().size();
- transformers = new Transformer[size];
- for (int i = 0; i < size; ++i) {
- if (udtfPlan.isUdfColumn(i)) {
- constructUdfTransformer(i, memoryBudgetForSingleWindowTransformer);
- } else if (udtfPlan.isArithmeticColumn(i)) {
- constructArithmeticTransformer(i);
- } else {
- constructRawQueryTransformer(i);
- }
- }
- }
-
- @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
- private float calculateMemoryBudgetForSingleWindowTransformer() {
- int size = udtfPlan.getPathToIndex().size();
- int windowTransformerCount = 0;
- for (int i = 0; i < size; ++i) {
- if (udtfPlan.isUdfColumn(i)) {
- switch (udtfPlan
- .getExecutorByDataSetOutputColumnIndex(i)
- .getConfigurations()
- .getAccessStrategy()
- .getAccessStrategyType()) {
- case SLIDING_SIZE_WINDOW:
- case SLIDING_TIME_WINDOW:
- ++windowTransformerCount;
- break;
- default:
- break;
- }
- }
- }
- return UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB / Math.max(windowTransformerCount, 1);
- }
-
- private void constructUdfTransformer(
- int columnIndex, float memoryBudgetForSingleWindowTransformer)
- throws QueryProcessException, IOException {
- UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(columnIndex);
- int[] readerIndexes = calculateUdfReaderIndexes(executor);
- AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
- switch (accessStrategy.getAccessStrategyType()) {
- case ROW_BY_ROW:
- transformers[columnIndex] =
- new UDFQueryRowTransformer(udfLayer.constructRowReader(readerIndexes), executor);
- break;
- case SLIDING_SIZE_WINDOW:
- case SLIDING_TIME_WINDOW:
- transformers[columnIndex] =
- new UDFQueryRowWindowTransformer(
- udfLayer.constructRowWindowReader(
- readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
- executor);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported transformer access strategy");
- }
- }
-
- private int[] calculateUdfReaderIndexes(UDTFExecutor executor) {
- List<PartialPath> paths = executor.getExpression().getPaths();
- int[] readerIndexes = new int[paths.size()];
- for (int i = 0; i < readerIndexes.length; ++i) {
- readerIndexes[i] = udtfPlan.getReaderIndex(paths.get(i).getFullPath());
- }
- return readerIndexes;
- }
-
- private void constructArithmeticTransformer(int columnIndex) {
- Expression expression =
- udtfPlan.getResultColumnByDatasetOutputIndex(columnIndex).getExpression();
-
- // unary expression
- if (expression instanceof NegationExpression) {
- transformers[columnIndex] =
- new ArithmeticNegationTransformer(
- constructPointReaderBySeriesName(
- ((NegationExpression) expression).getExpression().getExpressionString()));
- return;
- }
-
- // binary expression
- BinaryExpression binaryExpression = (BinaryExpression) expression;
- if (binaryExpression instanceof AdditionExpression) {
- transformers[columnIndex] =
- new ArithmeticAdditionTransformer(
- constructPointReaderBySeriesName(
- binaryExpression.getLeftExpression().getExpressionString()),
- constructPointReaderBySeriesName(
- binaryExpression.getRightExpression().getExpressionString()));
- } else if (binaryExpression instanceof SubtractionExpression) {
- transformers[columnIndex] =
- new ArithmeticSubtractionTransformer(
- constructPointReaderBySeriesName(
- binaryExpression.getLeftExpression().getExpressionString()),
- constructPointReaderBySeriesName(
- binaryExpression.getRightExpression().getExpressionString()));
- } else if (binaryExpression instanceof MultiplicationExpression) {
- transformers[columnIndex] =
- new ArithmeticMultiplicationTransformer(
- constructPointReaderBySeriesName(
- binaryExpression.getLeftExpression().getExpressionString()),
- constructPointReaderBySeriesName(
- binaryExpression.getRightExpression().getExpressionString()));
- } else if (binaryExpression instanceof DivisionExpression) {
- transformers[columnIndex] =
- new ArithmeticDivisionTransformer(
- constructPointReaderBySeriesName(
- binaryExpression.getLeftExpression().getExpressionString()),
- constructPointReaderBySeriesName(
- binaryExpression.getRightExpression().getExpressionString()));
- } else if (binaryExpression instanceof ModuloExpression) {
- transformers[columnIndex] =
- new ArithmeticModuloTransformer(
- constructPointReaderBySeriesName(
- binaryExpression.getLeftExpression().getExpressionString()),
- constructPointReaderBySeriesName(
- binaryExpression.getRightExpression().getExpressionString()));
- } else {
- throw new UnsupportedOperationException(binaryExpression.getExpressionString());
- }
- }
-
- private void constructRawQueryTransformer(int columnIndex) {
- transformers[columnIndex] =
- new RawQueryPointTransformer(
- constructPointReaderBySeriesName(
- udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(columnIndex)));
- }
-
- private LayerPointReader constructPointReaderBySeriesName(String seriesName) {
- return udfLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
+ transformers =
+ new DAGBuilder(queryId, udtfPlan, udfLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
+ .buildLayerMemoryAssigner()
+ .buildResultColumnPointReaders()
+ .getResultColumnPointReaders();
}
public void finalizeUDFs(long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 3d4ce07..738cbea 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -61,7 +61,7 @@ public abstract class Expression {
public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
public abstract IntermediateLayer constructIntermediateLayer(
- int queryId,
+ long queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 383a9a8..49bbee6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -131,7 +131,7 @@ public abstract class BinaryExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
- int queryId,
+ long queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index c40a619..72f64e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -182,7 +182,7 @@ public class FunctionExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
- int queryId,
+ long queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
@@ -214,7 +214,7 @@ public class FunctionExpression extends Expression {
}
private IntermediateLayer constructUdfInputIntermediateLayer(
- int queryId,
+ long queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index eade5be..955e2e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -95,7 +95,7 @@ public class NegationExpression extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
- int queryId,
+ long queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 6ad0ac3..2952453 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -89,7 +89,7 @@ public class TimeSeriesOperand extends Expression {
@Override
public IntermediateLayer constructIntermediateLayer(
- int queryId,
+ long queryId,
UDTFPlan udtfPlan,
UDFLayer rawTimeSeriesInputLayer,
Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 3e021e5..ca403c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -33,7 +33,7 @@ import java.util.Map;
public class DAGBuilder {
- private final int queryId;
+ private final long queryId;
private final UDTFPlan udtfPlan;
private final UDFLayer rawTimeSeriesInputLayer;
@@ -50,7 +50,7 @@ public class DAGBuilder {
// make sure that only one point reader will be built for one expression.
private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
- public DAGBuilder(int queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
+ public DAGBuilder(long queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
this.queryId = queryId;
this.udtfPlan = udtfPlan;
this.rawTimeSeriesInputLayer = inputLayer;