You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/18 12:36:23 UTC

[iotdb] branch master updated: [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation (#5569)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 38364cf79d [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation (#5569)
38364cf79d is described below

commit 38364cf79d0e50122e1fbcf02e9294898c27e6c2
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Apr 18 20:36:18 2022 +0800

    [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation (#5569)
---
 .../db/mpp/operator/process/TransformOperator.java | 152 ++++++++++++++++++---
 .../tsfile/read/common/block/TsBlockBuilder.java   |  15 +-
 2 files changed, 145 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index e2c452874f..48d0c71185 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
@@ -30,44 +32,66 @@ import org.apache.iotdb.db.query.udf.core.layer.TsBlockInputDataSet;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
 import java.util.List;
 
 public class TransformOperator implements ProcessOperator {
 
-  protected static final float UDF_READER_MEMORY_BUDGET_IN_MB =
+  private static final int FETCH_SIZE = 10000;
+
+  private final float udfReaderMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
-  protected static final float UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB =
+  private final float udfTransformerMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
-  protected static final float UDF_COLLECTOR_MEMORY_BUDGET_IN_MB =
+  private final float udfCollectorMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
 
   private final OperatorContext operatorContext;
-
+  private final Operator inputOperator;
+  private final List<TSDataType> inputDataTypes;
   private final Expression[] outputExpressions;
   private final UDTFContext udtfContext;
+  private final boolean keepNull;
 
+  private IUDFInputDataSet inputDataset;
   private LayerPointReader[] transformers;
+  private TimeSelector timeHeap;
 
   public TransformOperator(
       OperatorContext operatorContext,
+      Operator inputOperator,
       List<TSDataType> inputDataTypes,
       Expression[] outputExpressions,
-      UDTFContext udtfContext)
+      UDTFContext udtfContext,
+      boolean keepNull)
       throws QueryProcessException, IOException {
     this.operatorContext = operatorContext;
-
+    this.inputOperator = inputOperator;
+    this.inputDataTypes = inputDataTypes;
     this.outputExpressions = outputExpressions;
     this.udtfContext = udtfContext;
+    this.keepNull = keepNull;
 
-    initTransformers(inputDataTypes);
+    initInputDataset(inputDataTypes);
+    initTransformers();
+    initTimeHeap();
   }
 
-  protected void initTransformers(List<TSDataType> inputDataTypes)
-      throws QueryProcessException, IOException {
+  private void initInputDataset(List<TSDataType> inputDataTypes) {
+    inputDataset = new TsBlockInputDataSet(inputOperator, inputDataTypes);
+  }
+
+  private void initTransformers() throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
     // This statement must be surrounded by the registration lock.
     UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
@@ -77,12 +101,10 @@ public class TransformOperator implements ProcessOperator {
           new EvaluationDAGBuilder(
                   operatorContext.getOperatorId(),
                   new RawQueryInputLayer(
-                      operatorContext.getOperatorId(),
-                      UDF_READER_MEMORY_BUDGET_IN_MB,
-                      new TsBlockInputDataSet(this, inputDataTypes)),
+                      operatorContext.getOperatorId(), udfReaderMemoryBudgetInMB, inputDataset),
                   outputExpressions,
                   udtfContext,
-                  UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+                  udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
               .buildLayerMemoryAssigner()
               .buildResultColumnPointReaders()
               .getOutputPointReaders();
@@ -91,28 +113,116 @@ public class TransformOperator implements ProcessOperator {
     }
   }
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
+  private void initTimeHeap() throws QueryProcessException, IOException {
+    timeHeap = new TimeSelector(transformers.length << 1, true);
+    for (LayerPointReader reader : transformers) {
+      iterateReaderToNextValid(reader);
+    }
   }
 
-  @Override
-  public TsBlock next() {
-    return null;
+  private void iterateReaderToNextValid(LayerPointReader reader)
+      throws QueryProcessException, IOException {
+    // Since a constant operand is not allowed to be a result column, the reader will not be
+    // a ConstantLayerPointReader.
+    // If keepNull is false, we must iterate the reader until a non-null row is returned.
+    while (reader.next()) {
+      if (reader.isCurrentNull() && !keepNull) {
+        reader.readyForNext();
+        continue;
+      }
+      timeHeap.add(reader.currentTime());
+      break;
+    }
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return !timeHeap.isEmpty();
+  }
+
+  @Override
+  public TsBlock next() {
+    final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+    tsBlockBuilder.buildValueColumnBuilders(inputDataTypes);
+
+    final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    final int columnCount = columnBuilders.length;
+
+    int rowCount = 0;
+    try {
+      while (rowCount < FETCH_SIZE && !timeHeap.isEmpty()) {
+        long minTime = timeHeap.pollFirst();
+
+        timeBuilder.writeLong(minTime);
+
+        for (int i = 0; i < columnCount; ++i) {
+          LayerPointReader reader = transformers[i];
+
+          if (!reader.next() || reader.currentTime() != minTime || reader.isCurrentNull()) {
+            columnBuilders[i].appendNull();
+            continue;
+          }
+
+          TSDataType type = reader.getDataType();
+          switch (type) {
+            case INT32:
+              columnBuilders[i].writeInt(reader.currentInt());
+              break;
+            case INT64:
+              columnBuilders[i].writeLong(reader.currentLong());
+              break;
+            case FLOAT:
+              columnBuilders[i].writeFloat(reader.currentFloat());
+              break;
+            case DOUBLE:
+              columnBuilders[i].writeDouble(reader.currentDouble());
+              break;
+            case BOOLEAN:
+              columnBuilders[i].writeBoolean(reader.currentBoolean());
+              break;
+            case TEXT:
+              columnBuilders[i].writeBinary(reader.currentBinary());
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", type));
+          }
+
+          reader.readyForNext();
+
+          iterateReaderToNextValid(reader);
+        }
+
+        ++rowCount;
+      }
+    } catch (Exception e) {
+      // TODO: throw here?
+      throw new RuntimeException(e);
+    }
+
+    return tsBlockBuilder.build();
   }
 
   @Override
   public void close() throws Exception {
     udtfContext.finalizeUDFExecutors(operatorContext.getOperatorId());
+
+    inputOperator.close();
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return inputOperator.isBlocked();
   }
 
   @Override
   public boolean isFinished() throws IOException {
-    return false;
+    return !hasNext();
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index 909533553f..8de927b785 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -19,7 +19,16 @@
 package org.apache.iotdb.tsfile.read.common.block;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.column.*;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
@@ -223,6 +232,10 @@ public class TsBlockBuilder {
     return valueColumnBuilders[channel];
   }
 
+  public ColumnBuilder[] getValueColumnBuilders() {
+    return valueColumnBuilders;
+  }
+
   public TSDataType getType(int channel) {
     return types.get(channel);
   }