You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/17 10:00:39 UTC

[iotdb] branch iotdb-2911 created (now 9fe215d637)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch iotdb-2911
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 9fe215d637 [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation

This branch includes the following new commits:

     new 9fe215d637 [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2911
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9fe215d637ca2bdc48c8bbc94f0b75dbe6c3a1b7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Apr 17 18:00:21 2022 +0800

    [IOTDB-2911] TransformOperator: operator implementation for nested expression evaluation
---
 .../db/mpp/operator/process/TransformOperator.java | 152 ++++++++++++++++++---
 .../tsfile/read/common/block/TsBlockBuilder.java   |  15 +-
 2 files changed, 145 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index e2c452874f..48d0c71185 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
@@ -30,44 +32,66 @@ import org.apache.iotdb.db.query.udf.core.layer.TsBlockInputDataSet;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
 import java.util.List;
 
 public class TransformOperator implements ProcessOperator {
 
-  protected static final float UDF_READER_MEMORY_BUDGET_IN_MB =
+  private static final int FETCH_SIZE = 10000;
+
+  private final float udfReaderMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
-  protected static final float UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB =
+  private final float udfTransformerMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
-  protected static final float UDF_COLLECTOR_MEMORY_BUDGET_IN_MB =
+  private final float udfCollectorMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
 
   private final OperatorContext operatorContext;
-
+  private final Operator inputOperator;
+  private final List<TSDataType> inputDataTypes;
   private final Expression[] outputExpressions;
   private final UDTFContext udtfContext;
+  private final boolean keepNull;
 
+  private IUDFInputDataSet inputDataset;
   private LayerPointReader[] transformers;
+  private TimeSelector timeHeap;
 
   public TransformOperator(
       OperatorContext operatorContext,
+      Operator inputOperator,
       List<TSDataType> inputDataTypes,
       Expression[] outputExpressions,
-      UDTFContext udtfContext)
+      UDTFContext udtfContext,
+      boolean keepNull)
       throws QueryProcessException, IOException {
     this.operatorContext = operatorContext;
-
+    this.inputOperator = inputOperator;
+    this.inputDataTypes = inputDataTypes;
     this.outputExpressions = outputExpressions;
     this.udtfContext = udtfContext;
+    this.keepNull = keepNull;
 
-    initTransformers(inputDataTypes);
+    initInputDataset(inputDataTypes);
+    initTransformers();
+    initTimeHeap();
   }
 
-  protected void initTransformers(List<TSDataType> inputDataTypes)
-      throws QueryProcessException, IOException {
+  private void initInputDataset(List<TSDataType> inputDataTypes) {
+    inputDataset = new TsBlockInputDataSet(inputOperator, inputDataTypes);
+  }
+
+  private void initTransformers() throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
     // This statement must be surrounded by the registration lock.
     UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
@@ -77,12 +101,10 @@ public class TransformOperator implements ProcessOperator {
           new EvaluationDAGBuilder(
                   operatorContext.getOperatorId(),
                   new RawQueryInputLayer(
-                      operatorContext.getOperatorId(),
-                      UDF_READER_MEMORY_BUDGET_IN_MB,
-                      new TsBlockInputDataSet(this, inputDataTypes)),
+                      operatorContext.getOperatorId(), udfReaderMemoryBudgetInMB, inputDataset),
                   outputExpressions,
                   udtfContext,
-                  UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+                  udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
               .buildLayerMemoryAssigner()
               .buildResultColumnPointReaders()
               .getOutputPointReaders();
@@ -91,28 +113,116 @@ public class TransformOperator implements ProcessOperator {
     }
   }
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
+  private void initTimeHeap() throws QueryProcessException, IOException {
+    timeHeap = new TimeSelector(transformers.length << 1, true);
+    for (LayerPointReader reader : transformers) {
+      iterateReaderToNextValid(reader);
+    }
   }
 
-  @Override
-  public TsBlock next() {
-    return null;
+  private void iterateReaderToNextValid(LayerPointReader reader)
+      throws QueryProcessException, IOException {
+    // Since a constant operand is not allowed to be a result column, the reader will not be
+    // a ConstantLayerPointReader.
+    // If keepNull is false, we must iterate the reader until a non-null row is returned.
+    while (reader.next()) {
+      if (reader.isCurrentNull() && !keepNull) {
+        reader.readyForNext();
+        continue;
+      }
+      timeHeap.add(reader.currentTime());
+      break;
+    }
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return !timeHeap.isEmpty();
+  }
+
+  @Override
+  public TsBlock next() {
+    final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+    tsBlockBuilder.buildValueColumnBuilders(inputDataTypes);
+
+    final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    final int columnCount = columnBuilders.length;
+
+    int rowCount = 0;
+    try {
+      while (rowCount < FETCH_SIZE && !timeHeap.isEmpty()) {
+        long minTime = timeHeap.pollFirst();
+
+        timeBuilder.writeLong(minTime);
+
+        for (int i = 0; i < columnCount; ++i) {
+          LayerPointReader reader = transformers[i];
+
+          if (!reader.next() || reader.currentTime() != minTime || reader.isCurrentNull()) {
+            columnBuilders[i].appendNull();
+            continue;
+          }
+
+          TSDataType type = reader.getDataType();
+          switch (type) {
+            case INT32:
+              columnBuilders[i].writeInt(reader.currentInt());
+              break;
+            case INT64:
+              columnBuilders[i].writeLong(reader.currentLong());
+              break;
+            case FLOAT:
+              columnBuilders[i].writeFloat(reader.currentFloat());
+              break;
+            case DOUBLE:
+              columnBuilders[i].writeDouble(reader.currentDouble());
+              break;
+            case BOOLEAN:
+              columnBuilders[i].writeBoolean(reader.currentBoolean());
+              break;
+            case TEXT:
+              columnBuilders[i].writeBinary(reader.currentBinary());
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", type));
+          }
+
+          reader.readyForNext();
+
+          iterateReaderToNextValid(reader);
+        }
+
+        ++rowCount;
+      }
+    } catch (Exception e) {
+      // TODO: throw here?
+      throw new RuntimeException(e);
+    }
+
+    return tsBlockBuilder.build();
   }
 
   @Override
   public void close() throws Exception {
     udtfContext.finalizeUDFExecutors(operatorContext.getOperatorId());
+
+    inputOperator.close();
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return inputOperator.isBlocked();
   }
 
   @Override
   public boolean isFinished() throws IOException {
-    return false;
+    return !hasNext();
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index 909533553f..8de927b785 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -19,7 +19,16 @@
 package org.apache.iotdb.tsfile.read.common.block;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.column.*;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
@@ -223,6 +232,10 @@ public class TsBlockBuilder {
     return valueColumnBuilders[channel];
   }
 
+  public ColumnBuilder[] getValueColumnBuilders() {
+    return valueColumnBuilders;
+  }
+
   public TSDataType getType(int channel) {
     return types.get(channel);
   }