You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/12 13:43:29 UTC

[iotdb] branch udf-operator updated: basic structure of TransformOperator

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/udf-operator by this push:
     new ead674615e basic structure of TransformOperator
ead674615e is described below

commit ead674615ea55978bece2c83e76fd242af457830
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 21:43:13 2022 +0800

    basic structure of TransformOperator
---
 .../db/mpp/operator/process/TransformOperator.java | 118 +++++++++++++++++++++
 .../query/udf/core/layer/EvaluationDAGBuilder.java |  23 ++--
 2 files changed, 130 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
new file mode 100644
index 0000000000..e2c452874f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.process;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
+import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
+import org.apache.iotdb.db.query.udf.core.layer.TsBlockInputDataSet;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
+import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TransformOperator implements ProcessOperator {
+
+  protected static final float UDF_READER_MEMORY_BUDGET_IN_MB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
+  protected static final float UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
+  protected static final float UDF_COLLECTOR_MEMORY_BUDGET_IN_MB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
+
+  private final OperatorContext operatorContext;
+
+  private final Expression[] outputExpressions;
+  private final UDTFContext udtfContext;
+
+  private LayerPointReader[] transformers;
+
+  public TransformOperator(
+      OperatorContext operatorContext,
+      List<TSDataType> inputDataTypes,
+      Expression[] outputExpressions,
+      UDTFContext udtfContext)
+      throws QueryProcessException, IOException {
+    this.operatorContext = operatorContext;
+
+    this.outputExpressions = outputExpressions;
+    this.udtfContext = udtfContext;
+
+    initTransformers(inputDataTypes);
+  }
+
+  protected void initTransformers(List<TSDataType> inputDataTypes)
+      throws QueryProcessException, IOException {
+    UDFRegistrationService.getInstance().acquireRegistrationLock();
+    // This statement must be surrounded by the registration lock.
+    UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
+    try {
+      // UDF executors will be initialized at the same time
+      transformers =
+          new EvaluationDAGBuilder(
+                  operatorContext.getOperatorId(),
+                  new RawQueryInputLayer(
+                      operatorContext.getOperatorId(),
+                      UDF_READER_MEMORY_BUDGET_IN_MB,
+                      new TsBlockInputDataSet(this, inputDataTypes)),
+                  outputExpressions,
+                  udtfContext,
+                  UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB)
+              .buildLayerMemoryAssigner()
+              .buildResultColumnPointReaders()
+              .getOutputPointReaders();
+    } finally {
+      UDFRegistrationService.getInstance().releaseRegistrationLock();
+    }
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    udtfContext.finalizeUDFExecutors(operatorContext.getOperatorId());
+  }
+
+  @Override
+  public boolean isFinished() throws IOException {
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
index de98be426a..debc89d250 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
@@ -32,10 +32,11 @@ import java.util.Map;
 public class EvaluationDAGBuilder {
 
   private final long queryId;
+
   private final RawQueryInputLayer inputLayer;
 
-  private final Expression[] resultColumnExpressions;
-  private final LayerPointReader[] resultColumnPointReaders;
+  private final Expression[] outputExpressions;
+  private final LayerPointReader[] outputPointReaders;
 
   private final UDTFContext udtfContext;
 
@@ -50,17 +51,17 @@ public class EvaluationDAGBuilder {
 
   public EvaluationDAGBuilder(
       long queryId,
-      Expression[] resultColumnExpressions,
       RawQueryInputLayer inputLayer,
+      Expression[] outputExpressions,
       UDTFContext udtfContext,
       float memoryBudgetInMB) {
     this.queryId = queryId;
-    this.resultColumnExpressions = resultColumnExpressions;
     this.inputLayer = inputLayer;
+    this.outputExpressions = outputExpressions;
     this.udtfContext = udtfContext;
 
     int size = inputLayer.getInputColumnCount();
-    resultColumnPointReaders = new LayerPointReader[size];
+    outputPointReaders = new LayerPointReader[size];
 
     memoryAssigner = new LayerMemoryAssigner(memoryBudgetInMB);
 
@@ -69,7 +70,7 @@ public class EvaluationDAGBuilder {
   }
 
   public EvaluationDAGBuilder buildLayerMemoryAssigner() {
-    for (Expression expression : resultColumnExpressions) {
+    for (Expression expression : outputExpressions) {
       expression.updateStatisticsForMemoryAssigner(memoryAssigner);
     }
     memoryAssigner.build();
@@ -78,9 +79,9 @@ public class EvaluationDAGBuilder {
 
   public EvaluationDAGBuilder buildResultColumnPointReaders()
       throws QueryProcessException, IOException {
-    for (int i = 0; i < resultColumnExpressions.length; ++i) {
-      resultColumnPointReaders[i] =
-          resultColumnExpressions[i]
+    for (int i = 0; i < outputExpressions.length; ++i) {
+      outputPointReaders[i] =
+          outputExpressions[i]
               .constructIntermediateLayer(
                   queryId,
                   udtfContext,
@@ -93,7 +94,7 @@ public class EvaluationDAGBuilder {
     return this;
   }
 
-  public LayerPointReader[] getResultColumnPointReaders() {
-    return resultColumnPointReaders;
+  public LayerPointReader[] getOutputPointReaders() {
+    return outputPointReaders;
   }
 }