You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/05/28 04:14:00 UTC

[iotdb] 03/06: refactor UDTFDataSet

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1400
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6b05be253a4479e259b1d88164ee2b245db275b2
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu May 27 19:55:55 2021 +0800

    refactor UDTFDataSet
---
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 96 ++++++++++++----------
 1 file changed, 54 insertions(+), 42 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
index 19d81c6..4fa2db9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java
@@ -80,7 +80,7 @@ public abstract class UDTFDataSet extends QueryDataSet {
             readersOfSelectedSeries,
             cached);
     udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
-    initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+    initTransformers();
   }
 
   /** execute without value filters */
@@ -102,24 +102,34 @@ public abstract class UDTFDataSet extends QueryDataSet {
             deduplicatedDataTypes,
             readersOfSelectedSeries);
     udtfPlan.initializeUdfExecutors(queryId, UDF_COLLECTOR_MEMORY_BUDGET_IN_MB);
-    initTransformers(UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB);
+    initTransformers();
   }
 
-  @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
-  protected void initTransformers(float memoryBudgetInMB)
-      throws QueryProcessException, IOException {
-    int size = udtfPlan.getPathToIndex().size();
+  protected void initTransformers() throws QueryProcessException, IOException {
+    final float memoryBudgetForSingleWindowTransformer =
+        calculateMemoryBudgetForSingleWindowTransformer();
+    final int size = udtfPlan.getPathToIndex().size();
     transformers = new Transformer[size];
+    for (int i = 0; i < size; ++i) {
+      if (udtfPlan.isUdfColumn(i)) {
+        constructUDFTransformer(i, memoryBudgetForSingleWindowTransformer);
+      } else {
+        constructRawQueryTransformer(i);
+      }
+    }
+  }
 
+  @SuppressWarnings("squid:S3518") // "Math.max(windowTransformerCount, 1)" can't be zero
+  private float calculateMemoryBudgetForSingleWindowTransformer() {
+    int size = udtfPlan.getPathToIndex().size();
     int windowTransformerCount = 0;
     for (int i = 0; i < size; ++i) {
       if (udtfPlan.isUdfColumn(i)) {
-        AccessStrategy accessStrategy =
-            udtfPlan
-                .getExecutorByDataSetOutputColumnIndex(i)
-                .getConfigurations()
-                .getAccessStrategy();
-        switch (accessStrategy.getAccessStrategyType()) {
+        switch (udtfPlan
+            .getExecutorByDataSetOutputColumnIndex(i)
+            .getConfigurations()
+            .getAccessStrategy()
+            .getAccessStrategyType()) {
           case SLIDING_SIZE_WINDOW:
           case SLIDING_TIME_WINDOW:
             ++windowTransformerCount;
@@ -129,40 +139,34 @@ public abstract class UDTFDataSet extends QueryDataSet {
         }
       }
     }
-    memoryBudgetInMB /= Math.max(windowTransformerCount, 1);
+    return UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB / Math.max(windowTransformerCount, 1);
+  }
 
-    for (int i = 0; i < size; ++i) {
-      if (udtfPlan.isUdfColumn(i)) {
-        UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(i);
-        int[] readerIndexes = calculateReaderIndexes(executor);
-        AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
-        switch (accessStrategy.getAccessStrategyType()) {
-          case ROW_BY_ROW:
-            transformers[i] =
-                new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
-            break;
-          case SLIDING_SIZE_WINDOW:
-          case SLIDING_TIME_WINDOW:
-            transformers[i] =
-                new UDFQueryRowWindowTransformer(
-                    inputLayer.constructRowWindowReader(
-                        readerIndexes, accessStrategy, memoryBudgetInMB),
-                    executor);
-            break;
-          default:
-            throw new UnsupportedOperationException("Unsupported transformer access strategy");
-        }
-      } else {
-        transformers[i] =
-            new RawQueryPointTransformer(
-                inputLayer.constructPointReader(
-                    udtfPlan.getReaderIndex(
-                        udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(i))));
-      }
+  private void constructUDFTransformer(
+      int columnIndex, float memoryBudgetForSingleWindowTransformer)
+      throws QueryProcessException, IOException {
+    UDTFExecutor executor = udtfPlan.getExecutorByDataSetOutputColumnIndex(columnIndex);
+    int[] readerIndexes = calculateUDFReaderIndexes(executor);
+    AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy();
+    switch (accessStrategy.getAccessStrategyType()) {
+      case ROW_BY_ROW:
+        transformers[columnIndex] =
+            new UDFQueryRowTransformer(inputLayer.constructRowReader(readerIndexes), executor);
+        break;
+      case SLIDING_SIZE_WINDOW:
+      case SLIDING_TIME_WINDOW:
+        transformers[columnIndex] =
+            new UDFQueryRowWindowTransformer(
+                inputLayer.constructRowWindowReader(
+                    readerIndexes, accessStrategy, memoryBudgetForSingleWindowTransformer),
+                executor);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported transformer access strategy");
     }
   }
 
-  private int[] calculateReaderIndexes(UDTFExecutor executor) {
+  private int[] calculateUDFReaderIndexes(UDTFExecutor executor) {
     List<PartialPath> paths = executor.getExpression().getPaths();
     int[] readerIndexes = new int[paths.size()];
     for (int i = 0; i < readerIndexes.length; ++i) {
@@ -171,6 +175,14 @@ public abstract class UDTFDataSet extends QueryDataSet {
     return readerIndexes;
   }
 
+  private void constructRawQueryTransformer(int columnIndex) {
+    transformers[columnIndex] =
+        new RawQueryPointTransformer(
+            inputLayer.constructPointReader(
+                udtfPlan.getReaderIndex(
+                    udtfPlan.getRawQueryColumnNameByDatasetOutputColumnIndex(columnIndex))));
+  }
+
   public void finalizeUDFs(long queryId) {
     udtfPlan.finalizeUDFExecutors(queryId);
   }