You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/09 01:43:30 UTC

[iotdb] 01/02: make the new code work

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 86cf4b1955f60ccfa0aee792fac824b8d76f54d8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Sep 8 18:22:06 2021 +0800

    make the new code work
---
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 161 +--------------------
 .../iotdb/db/query/expression/Expression.java      |   2 +-
 .../query/expression/binary/BinaryExpression.java  |   2 +-
 .../query/expression/unary/FunctionExpression.java |   4 +-
 .../query/expression/unary/NegationExpression.java |   2 +-
 .../query/expression/unary/TimeSeriesOperand.java  |   2 +-
 .../iotdb/db/query/udf/core/layer/DAGBuilder.java  |   4 +-
 7 files changed, 14 insertions(+), 163 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 6ed497d..3f69f62 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -24,30 +24,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.expression.binary.AdditionExpression;
-import org.apache.iotdb.db.query.expression.binary.BinaryExpression;
-import org.apache.iotdb.db.query.expression.binary.DivisionExpression;
-import org.apache.iotdb.db.query.expression.binary.ModuloExpression;
-import org.apache.iotdb.db.query.expression.binary.MultiplicationExpression;
-import org.apache.iotdb.db.query.expression.binary.SubtractionExpression;
-import org.apache.iotdb.db.query.expression.unary.NegationExpression;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
-import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
-import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
+import org.apache.iotdb.db.query.udf.core.layer.DAGBuilder;
 import org.apache.iotdb.db.query.udf.core.layer.UDFLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticAdditionTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticDivisionTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticModuloTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticMultiplicationTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticNegationTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.ArithmeticSubtractionTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.RawQueryPointTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.Transformer;
-import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowTransformer;
-import org.apache.iotdb.db.query.udf.core.transformer.UDFQueryRowWindowTransformer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -120,141 +101,11 @@ public abstract class UDTFDataSet extends QueryDataSet {
   }
 
   protected void initTransformers() throws QueryProcessException, IOException {
-    final float memoryBudgetForSingleWindowTransformer =
-        calculateMemoryBudgetForSingleWindowTransformer();
-    final int size = udtfPlan.getPathToIndex().size();
-    transformers = new Transformer[size];
-    for (int i = 0; i < size; ++i) {
-      if (udtfPlan.isUdfColumn(i)) {
-        constructUdfTransformer(i, memoryBudgetForSingleWindowTransformer);
-      } else if (udtfPlan.isArithmeticColumn(i)) {
-        constructArithmeticTransformer(i);
-      } else {
-        constructRawQueryTransformer(i);
-      }
-    }
-  }
-
-  @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
-  private float calculateMemoryBudgetForSingleWindowTransformer() {
-    int size = udtfPlan.getPathToIndex().size();
-    int windowTransformerCount = 0;
-    for (int i = 0; i < size; ++i) {
-      if (udtfPlan.isUdfColumn(i)) {
-        switch (udtfPlan
-            .getExecutorByDataSetOutputColumnIndex(i)
-            .getConfigurations()
-            .getAccessStrategy()
-            .getAccessStrategyType()) {
-          case SLIDING_SIZE_WINDOW:
-          case SLIDING_TIME_WINDOW:
-            ++windowTransformerCount;
-            break;
-          default:
-            break;
-        }
-      }
-    }
-    return UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB / Math.max(windowTransformerCount, 1);
-  }
-
-  private void constructUdfTransformer(
-      int columnIndex, float memoryBudgetForSingleWindowTransformer)
-      throws QueryProcessException, IOException {
-    UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(columnIndex);
-    int[] readerIndexes = calculateUdfReaderIndexes(executor);
-    AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
-    switch (accessStrategy.getAccessStrategyType()) {
-      case ROW_BY_ROW:
-        transformers[columnIndex] =
-            new UDFQueryRowTransformer(udfLayer.constructRowReader(readerIndexes), executor);
-        break;
-      case SLIDING_SIZE_WINDOW:
-      case SLIDING_TIME_WINDOW:
-        transformers[columnIndex] =
-            new UDFQueryRowWindowTransformer(
-                udfLayer.constructRowWindowReader(
-                    readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
-                executor);
-        break;
-      default:
-        throw new UnsupportedOperationException("Unsupported transformer access strategy");
-    }
-  }
-
-  private int[] calculateUdfReaderIndexes(UDTFExecutor executor) {
-    List<PartialPath> paths = executor.getExpression().getPaths();
-    int[] readerIndexes = new int[paths.size()];
-    for (int i = 0; i < readerIndexes.length; ++i) {
-      readerIndexes[i] = udtfPlan.getReaderIndex(paths.get(i).getFullPath());
-    }
-    return readerIndexes;
-  }
-
-  private void constructArithmeticTransformer(int columnIndex) {
-    Expression expression =
-        udtfPlan.getResultColumnByDatasetOutputIndex(columnIndex).getExpression();
-
-    // unary expression
-    if (expression instanceof NegationExpression) {
-      transformers[columnIndex] =
-          new ArithmeticNegationTransformer(
-              constructPointReaderBySeriesName(
-                  ((NegationExpression) expression).getExpression().getExpressionString()));
-      return;
-    }
-
-    // binary expression
-    BinaryExpression binaryExpression = (BinaryExpression) expression;
-    if (binaryExpression instanceof AdditionExpression) {
-      transformers[columnIndex] =
-          new ArithmeticAdditionTransformer(
-              constructPointReaderBySeriesName(
-                  binaryExpression.getLeftExpression().getExpressionString()),
-              constructPointReaderBySeriesName(
-                  binaryExpression.getRightExpression().getExpressionString()));
-    } else if (binaryExpression instanceof SubtractionExpression) {
-      transformers[columnIndex] =
-          new ArithmeticSubtractionTransformer(
-              constructPointReaderBySeriesName(
-                  binaryExpression.getLeftExpression().getExpressionString()),
-              constructPointReaderBySeriesName(
-                  binaryExpression.getRightExpression().getExpressionString()));
-    } else if (binaryExpression instanceof MultiplicationExpression) {
-      transformers[columnIndex] =
-          new ArithmeticMultiplicationTransformer(
-              constructPointReaderBySeriesName(
-                  binaryExpression.getLeftExpression().getExpressionString()),
-              constructPointReaderBySeriesName(
-                  binaryExpression.getRightExpression().getExpressionString()));
-    } else if (binaryExpression instanceof DivisionExpression) {
-      transformers[columnIndex] =
-          new ArithmeticDivisionTransformer(
-              constructPointReaderBySeriesName(
-                  binaryExpression.getLeftExpression().getExpressionString()),
-              constructPointReaderBySeriesName(
-                  binaryExpression.getRightExpression().getExpressionString()));
-    } else if (binaryExpression instanceof ModuloExpression) {
-      transformers[columnIndex] =
-          new ArithmeticModuloTransformer(
-              constructPointReaderBySeriesName(
-                  binaryExpression.getLeftExpression().getExpressionString()),
-              constructPointReaderBySeriesName(
-                  binaryExpression.getRightExpression().getExpressionString()));
-    } else {
-      throw new UnsupportedOperationException(binaryExpression.getExpressionString());
-    }
-  }
-
-  private void constructRawQueryTransformer(int columnIndex) {
-    transformers[columnIndex] =
-        new RawQueryPointTransformer(
-            constructPointReaderBySeriesName(
-                udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(columnIndex)));
-  }
-
-  private LayerPointReader constructPointReaderBySeriesName(String seriesName) {
-    return udfLayer.constructPointReader(udtfPlan.getReaderIndex(seriesName));
+    transformers =
+        new DAGBuilder(queryId, udtfPlan, udfLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB)
+            .buildLayerMemoryAssigner()
+            .buildResultColumnPointReaders()
+            .getResultColumnPointReaders();
   }
 
   public void finalizeUDFs(long queryId) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 3d4ce07..738cbea 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -61,7 +61,7 @@ public abstract class Expression {
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner);
 
   public abstract IntermediateLayer constructIntermediateLayer(
-      int queryId,
+      long queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 383a9a8..49bbee6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -131,7 +131,7 @@ public abstract class BinaryExpression extends Expression {
 
   @Override
   public IntermediateLayer constructIntermediateLayer(
-      int queryId,
+      long queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index c40a619..72f64e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -182,7 +182,7 @@ public class FunctionExpression extends Expression {
 
   @Override
   public IntermediateLayer constructIntermediateLayer(
-      int queryId,
+      long queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
@@ -214,7 +214,7 @@ public class FunctionExpression extends Expression {
   }
 
   private IntermediateLayer constructUdfInputIntermediateLayer(
-      int queryId,
+      long queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index eade5be..955e2e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -95,7 +95,7 @@ public class NegationExpression extends Expression {
 
   @Override
   public IntermediateLayer constructIntermediateLayer(
-      int queryId,
+      long queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 6ad0ac3..2952453 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -89,7 +89,7 @@ public class TimeSeriesOperand extends Expression {
 
   @Override
   public IntermediateLayer constructIntermediateLayer(
-      int queryId,
+      long queryId,
       UDTFPlan udtfPlan,
       UDFLayer rawTimeSeriesInputLayer,
       Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
index 3e021e5..ca403c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java
@@ -33,7 +33,7 @@ import java.util.Map;
 
 public class DAGBuilder {
 
-  private final int queryId;
+  private final long queryId;
   private final UDTFPlan udtfPlan;
   private final UDFLayer rawTimeSeriesInputLayer;
 
@@ -50,7 +50,7 @@ public class DAGBuilder {
   // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap;
 
-  public DAGBuilder(int queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
+  public DAGBuilder(long queryId, UDTFPlan udtfPlan, UDFLayer inputLayer, float memoryBudgetInMB) {
     this.queryId = queryId;
     this.udtfPlan = udtfPlan;
     this.rawTimeSeriesInputLayer = inputLayer;