You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/25 14:08:16 UTC

[iotdb] branch master updated: [IOTDB-2912] MPP: FilterOperator (#5645)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f7bd1ad8c3 [IOTDB-2912] MPP: FilterOperator (#5645)
f7bd1ad8c3 is described below

commit f7bd1ad8c3961fbb5d54e4a22060a745ac2ec9b0
Author: flashzxi <39...@users.noreply.github.com>
AuthorDate: Mon Apr 25 22:08:10 2022 +0800

    [IOTDB-2912] MPP: FilterOperator (#5645)
    
    Co-authored-by: Steve Yurong Su <ro...@apache.org>
---
 .../db/mpp/operator/process/FilterOperator.java    | 155 +++++++++++++++++++++
 .../db/mpp/operator/process/TransformOperator.java | 125 +++++++++--------
 2 files changed, 224 insertions(+), 56 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterOperator.java
new file mode 100644
index 0000000000..1421bbcac6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.process;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
+import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FilterOperator extends TransformOperator {
+
+  private LayerPointReader filterPointReader;
+
+  public FilterOperator(
+      OperatorContext operatorContext,
+      Operator inputOperator,
+      List<TSDataType> inputDataTypes,
+      Expression filterExpression,
+      Expression[] outputExpressions,
+      UDTFContext udtfContext)
+      throws QueryProcessException, IOException {
+    super(
+        operatorContext,
+        inputOperator,
+        inputDataTypes,
+        bindExpressions(filterExpression, outputExpressions),
+        udtfContext,
+        false);
+  }
+
+  private static Expression[] bindExpressions(
+      Expression filterExpression, Expression[] outputExpressions) {
+    Expression[] expressions = new Expression[outputExpressions.length + 1];
+    System.arraycopy(outputExpressions, 0, expressions, 0, outputExpressions.length);
+    expressions[expressions.length - 1] = filterExpression;
+    return expressions;
+  }
+
+  @Override
+  protected void initTransformers() throws QueryProcessException, IOException {
+    super.initTransformers();
+
+    filterPointReader = transformers[transformers.length - 1];
+    if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
+      throw new UnSupportedDataTypeException(
+          String.format(
+              "Data type of the filter expression should be BOOLEAN, but %s is received.",
+              filterPointReader.getDataType()));
+    }
+  }
+
+  @Override
+  protected void initLayerPointReaders() throws QueryProcessException, IOException {
+    iterateFilterReaderToNextValid();
+  }
+
+  private void iterateFilterReaderToNextValid() throws QueryProcessException, IOException {
+    while (filterPointReader.next()
+        && (filterPointReader.isCurrentNull() || !filterPointReader.currentBoolean())) {
+      filterPointReader.readyForNext();
+    }
+  }
+
+  @Override
+  public TsBlock next() {
+    final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
+
+    final int outputColumnCount = transformers.length - 1;
+
+    if (outputDataTypes == null) {
+      outputDataTypes = new ArrayList<>();
+      for (int i = 0; i < outputColumnCount; ++i) {
+        outputDataTypes.add(transformers[i].getDataType());
+      }
+    }
+    tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
+
+    final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+    try {
+      int rowCount = 0;
+      while (rowCount < FETCH_SIZE && filterPointReader.next()) {
+        final long currentTime = filterPointReader.currentTime();
+
+        boolean hasAtLeastOneValid = false;
+        for (int i = 0; i < outputColumnCount; ++i) {
+          if (currentTime == iterateValueReadersToNextValid(transformers[i], currentTime)) {
+            hasAtLeastOneValid = true;
+          }
+        }
+
+        if (hasAtLeastOneValid) {
+          timeBuilder.writeLong(currentTime);
+          for (int i = 0; i < outputColumnCount; ++i) {
+            collectDataPoint(transformers[i], columnBuilders[i], currentTime);
+          }
+          ++rowCount;
+        }
+
+        iterateFilterReaderToNextValid();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return tsBlockBuilder.build();
+  }
+
+  private long iterateValueReadersToNextValid(LayerPointReader reader, long currentTime)
+      throws QueryProcessException, IOException {
+    while (reader.next() && (reader.isCurrentNull() || reader.currentTime() < currentTime)) {
+      reader.readyForNext();
+    }
+    return reader.currentTime();
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      return filterPointReader.next();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
index cd3355d395..f74e17acc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TransformOperator.java
@@ -43,29 +43,31 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 public class TransformOperator implements ProcessOperator {
 
-  private static final int FETCH_SIZE = 10000;
+  protected static final int FETCH_SIZE = 10000;
 
-  private final float udfReaderMemoryBudgetInMB =
+  protected final float udfReaderMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
-  private final float udfTransformerMemoryBudgetInMB =
+  protected final float udfTransformerMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
-  private final float udfCollectorMemoryBudgetInMB =
+  protected final float udfCollectorMemoryBudgetInMB =
       IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
 
-  private final OperatorContext operatorContext;
-  private final Operator inputOperator;
-  private final List<TSDataType> inputDataTypes;
-  private final Expression[] outputExpressions;
-  private final UDTFContext udtfContext;
-  private final boolean keepNull;
+  protected final OperatorContext operatorContext;
+  protected final Operator inputOperator;
+  protected final List<TSDataType> inputDataTypes;
+  protected final Expression[] outputExpressions;
+  protected final UDTFContext udtfContext;
+  protected final boolean keepNull;
 
-  private IUDFInputDataSet inputDataset;
-  private LayerPointReader[] transformers;
-  private TimeSelector timeHeap;
+  protected IUDFInputDataSet inputDataset;
+  protected LayerPointReader[] transformers;
+  protected TimeSelector timeHeap;
+  protected List<TSDataType> outputDataTypes;
 
   public TransformOperator(
       OperatorContext operatorContext,
@@ -84,18 +86,18 @@ public class TransformOperator implements ProcessOperator {
 
     initInputDataset(inputDataTypes);
     initTransformers();
-    initTimeHeap();
+    initLayerPointReaders();
   }
 
   private void initInputDataset(List<TSDataType> inputDataTypes) {
     inputDataset = new TsBlockInputDataSet(inputOperator, inputDataTypes);
   }
 
-  private void initTransformers() throws QueryProcessException, IOException {
+  protected void initTransformers() throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
-    // This statement must be surrounded by the registration lock.
-    UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
     try {
+      // This statement must be surrounded by the registration lock.
+      UDFClassLoaderManager.getInstance().initializeUDFQuery(operatorContext.getOperatorId());
       // UDF executors will be initialized at the same time
       transformers =
           new EvaluationDAGBuilder(
@@ -113,7 +115,7 @@ public class TransformOperator implements ProcessOperator {
     }
   }
 
-  private void initTimeHeap() throws QueryProcessException, IOException {
+  protected void initLayerPointReaders() throws QueryProcessException, IOException {
     timeHeap = new TimeSelector(transformers.length << 1, true);
     for (LayerPointReader reader : transformers) {
       iterateReaderToNextValid(reader);
@@ -143,67 +145,78 @@ public class TransformOperator implements ProcessOperator {
   @Override
   public TsBlock next() {
     final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
-    tsBlockBuilder.buildValueColumnBuilders(inputDataTypes);
+
+    if (outputDataTypes == null) {
+      outputDataTypes = new ArrayList<>();
+      for (LayerPointReader reader : transformers) {
+        outputDataTypes.add(reader.getDataType());
+      }
+    }
+    tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
 
     final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
     final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
     final int columnCount = columnBuilders.length;
 
-    int rowCount = 0;
     try {
+      int rowCount = 0;
       while (rowCount < FETCH_SIZE && !timeHeap.isEmpty()) {
-        long minTime = timeHeap.pollFirst();
+        final long currentTime = timeHeap.pollFirst();
 
-        timeBuilder.writeLong(minTime);
+        // time
+        timeBuilder.writeLong(currentTime);
 
+        // values
         for (int i = 0; i < columnCount; ++i) {
           LayerPointReader reader = transformers[i];
-
-          if (!reader.next() || reader.currentTime() != minTime || reader.isCurrentNull()) {
-            columnBuilders[i].appendNull();
-            continue;
-          }
-
-          TSDataType type = reader.getDataType();
-          switch (type) {
-            case INT32:
-              columnBuilders[i].writeInt(reader.currentInt());
-              break;
-            case INT64:
-              columnBuilders[i].writeLong(reader.currentLong());
-              break;
-            case FLOAT:
-              columnBuilders[i].writeFloat(reader.currentFloat());
-              break;
-            case DOUBLE:
-              columnBuilders[i].writeDouble(reader.currentDouble());
-              break;
-            case BOOLEAN:
-              columnBuilders[i].writeBoolean(reader.currentBoolean());
-              break;
-            case TEXT:
-              columnBuilders[i].writeBinary(reader.currentBinary());
-              break;
-            default:
-              throw new UnSupportedDataTypeException(
-                  String.format("Data type %s is not supported.", type));
-          }
-
-          reader.readyForNext();
-
+          collectDataPoint(reader, columnBuilders[i], currentTime);
           iterateReaderToNextValid(reader);
         }
 
         ++rowCount;
       }
     } catch (Exception e) {
-      // TODO: throw here?
       throw new RuntimeException(e);
     }
 
     return tsBlockBuilder.build();
   }
 
+  protected void collectDataPoint(LayerPointReader reader, ColumnBuilder writer, long currentTime)
+      throws QueryProcessException, IOException {
+    if (!reader.next() || reader.currentTime() != currentTime || reader.isCurrentNull()) {
+      writer.appendNull();
+      return;
+    }
+
+    TSDataType type = reader.getDataType();
+    switch (type) {
+      case INT32:
+        writer.writeInt(reader.currentInt());
+        break;
+      case INT64:
+        writer.writeLong(reader.currentLong());
+        break;
+      case FLOAT:
+        writer.writeFloat(reader.currentFloat());
+        break;
+      case DOUBLE:
+        writer.writeDouble(reader.currentDouble());
+        break;
+      case BOOLEAN:
+        writer.writeBoolean(reader.currentBoolean());
+        break;
+      case TEXT:
+        writer.writeBinary(reader.currentBinary());
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", type));
+    }
+
+    reader.readyForNext();
+  }
+
   @Override
   public void close() throws Exception {
     udtfContext.finalizeUDFExecutors(operatorContext.getOperatorId());