You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/07/01 04:26:43 UTC

[pinot] branch master updated: add project/transform operator to multistage engine (#8967)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d4a64e3853 add project/transform operator to multistage engine (#8967)
d4a64e3853 is described below

commit d4a64e3853102b220cde905f79eaede0054bc43b
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Thu Jun 30 21:26:35 2022 -0700

    add project/transform operator to multistage engine (#8967)
    
    * initial to add transform operator
    
    - only reference is suppoted, no call yet
    - should work with any transform
    
    * also add transform operator with function call
    
    * fix function resolution
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/query/planner/logical/StagePlanner.java  |   1 +
 .../runtime/executor/WorkerQueryExecutor.java      |   5 +-
 .../query/runtime/operator/AggregateOperator.java  |   3 +
 .../query/runtime/operator/OperatorUtils.java      |  67 ++++++++
 .../query/runtime/operator/TransformOperator.java  | 185 +++++++++++++++++++++
 .../pinot/query/runtime/QueryRunnerTest.java       |  19 ++-
 6 files changed, 278 insertions(+), 2 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 02c2e7fb28..cf0b218708 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -95,6 +95,7 @@ public class StagePlanner {
   }
 
   // non-threadsafe
+  // TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
   private StageNode walkRelPlan(RelNode node, int currentStageId) {
     if (isExchangeNode(node)) {
       // 1. exchangeNode always have only one input, get its input converted as a new stage root.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 379d7bdbd6..009ebcdc3e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -43,6 +43,7 @@ import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.TransformOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
@@ -127,7 +128,9 @@ public class WorkerQueryExecutor {
     } else if (stageNode instanceof FilterNode) {
       throw new UnsupportedOperationException("Unsupported!");
     } else if (stageNode instanceof ProjectNode) {
-      throw new UnsupportedOperationException("Unsupported!");
+      ProjectNode projectNode = (ProjectNode) stageNode;
+      return new TransformOperator(getOperator(requestId, projectNode.getInputs().get(0), metadataMap),
+          projectNode.getProjects(), projectNode.getInputs().get(0).getDataSchema());
     } else {
       throw new UnsupportedOperationException(
           String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 95a5dcd03f..8e2c7ef77a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -159,6 +159,7 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
     switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) {
       case "$SUM":
       case "$SUM0":
+      case "SUM":
         return new SumAggregationFunction(
             ExpressionContext.forIdentifier(
                 ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString()));
@@ -167,11 +168,13 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
         return new CountAggregationFunction();
       case "$MIN":
       case "$MIN0":
+      case "MIN":
         return new MinAggregationFunction(
             ExpressionContext.forIdentifier(
                 ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString()));
       case "$MAX":
       case "$MAX0":
+      case "MAX":
         return new MaxAggregationFunction(
             ExpressionContext.forIdentifier(
                 ((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
new file mode 100644
index 0000000000..10974f4002
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class OperatorUtils {
+
+  private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new HashMap<>();
+
+  static {
+    OPERATOR_TOKEN_MAPPING.put("=", "EQ");
+    OPERATOR_TOKEN_MAPPING.put(">", "GT");
+    OPERATOR_TOKEN_MAPPING.put("<", "LT");
+    OPERATOR_TOKEN_MAPPING.put("?", "HOOK");
+    OPERATOR_TOKEN_MAPPING.put(":", "COLON");
+    OPERATOR_TOKEN_MAPPING.put("<=", "LE");
+    OPERATOR_TOKEN_MAPPING.put(">=", "GE");
+    OPERATOR_TOKEN_MAPPING.put("<>", "NE");
+    OPERATOR_TOKEN_MAPPING.put("!=", "NE2");
+    OPERATOR_TOKEN_MAPPING.put("+", "PLUS");
+    OPERATOR_TOKEN_MAPPING.put("-", "MINUS");
+    OPERATOR_TOKEN_MAPPING.put("*", "STAR");
+    OPERATOR_TOKEN_MAPPING.put("/", "DIVIDE");
+    OPERATOR_TOKEN_MAPPING.put("%", "PERCENT_REMAINDER");
+    OPERATOR_TOKEN_MAPPING.put("||", "CONCAT");
+    OPERATOR_TOKEN_MAPPING.put("=>", "NAMED_ARGUMENT_ASSIGNMENT");
+    OPERATOR_TOKEN_MAPPING.put("..", "DOUBLE_PERIOD");
+    OPERATOR_TOKEN_MAPPING.put("'", "QUOTE");
+    OPERATOR_TOKEN_MAPPING.put("\"", "DOUBLE_QUOTE");
+    OPERATOR_TOKEN_MAPPING.put("|", "VERTICAL_BAR");
+    OPERATOR_TOKEN_MAPPING.put("^", "CARET");
+    OPERATOR_TOKEN_MAPPING.put("$", "DOLLAR");
+  }
+
+  private OperatorUtils() {
+    // do not instantiate.
+  }
+
+  /**
+   * Canonicalize function name since Logical plan uses Parser.jj extracted tokens.
+   * @param functionName input Function name
+   * @return Canonicalize form of the input function name
+   */
+  public static String canonicalizeFunctionName(String functionName) {
+    functionName = OPERATOR_TOKEN_MAPPING.getOrDefault(functionName, functionName);
+    return functionName;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
new file mode 100644
index 0000000000..6744e35d99
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.function.FunctionInfo;
+import org.apache.pinot.common.function.FunctionInvoker;
+import org.apache.pinot.common.function.FunctionRegistry;
+import org.apache.pinot.common.function.FunctionUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+/**
+ * This basic {@code TransformOperator} implement basic transformations.
+ */
+public class TransformOperator extends BaseOperator<TransferableBlock> {
+  private static final String EXPLAIN_NAME = "TRANSFORM";
+  private final BaseOperator<TransferableBlock> _upstreamOperator;
+  private final List<TransformOperands> _transformOperandsList;
+  private final int _resultColumnSize;
+  private final DataSchema _resultSchema;
+
+  public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator, List<RexExpression> transforms,
+      DataSchema upstreamDataSchema) {
+    _upstreamOperator = upstreamOperator;
+    _resultColumnSize = transforms.size();
+    _transformOperandsList = new ArrayList<>(_resultColumnSize);
+    for (RexExpression rexExpression : transforms) {
+      _transformOperandsList.add(TransformOperands.toFunctionOperands(rexExpression, upstreamDataSchema));
+    }
+    String[] columnNames = new String[_resultColumnSize];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[_resultColumnSize];
+    for (int i = 0; i < _resultColumnSize; i++) {
+      columnNames[i] = _transformOperandsList.get(i).getResultName();
+      columnDataTypes[i] = _transformOperandsList.get(i).getResultType();
+    }
+    _resultSchema = new DataSchema(columnNames, columnDataTypes);
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    // WorkerExecutor doesn't use getChildOperators, returns null here.
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      return transform(_upstreamOperator.nextBlock());
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock transform(TransferableBlock block)
+      throws Exception {
+    if (TransferableBlockUtils.isEndOfStream(block)) {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+    }
+    List<Object[]> resultRows = new ArrayList<>();
+    List<Object[]> container = block.getContainer();
+    for (Object[] row : container) {
+      Object[] resultRow = new Object[_resultColumnSize];
+      for (int i = 0; i < _resultColumnSize; i++) {
+        resultRow[i] = _transformOperandsList.get(i).apply(row);
+      }
+      resultRows.add(resultRow);
+    }
+    return new TransferableBlock(resultRows, _resultSchema, BaseDataBlock.Type.ROW);
+  }
+
+  private static abstract class TransformOperands {
+    protected String _resultName;
+    protected DataSchema.ColumnDataType _resultType;
+
+    public static TransformOperands toFunctionOperands(RexExpression rexExpression, DataSchema dataSchema) {
+      if (rexExpression instanceof RexExpression.InputRef) {
+        return new ReferenceOperands((RexExpression.InputRef) rexExpression, dataSchema);
+      } else if (rexExpression instanceof RexExpression.FunctionCall) {
+        return new FunctionOperands((RexExpression.FunctionCall) rexExpression, dataSchema);
+      } else {
+        throw new UnsupportedOperationException("Unsupported RexExpression: " + rexExpression);
+      }
+    }
+
+    public String getResultName() {
+      return _resultName;
+    }
+
+    public DataSchema.ColumnDataType getResultType() {
+      return _resultType;
+    }
+
+    public abstract Object apply(Object[] row);
+  }
+
+  private static class FunctionOperands extends TransformOperands {
+    private final List<TransformOperands> _childOperandList;
+    private final FunctionInvoker _functionInvoker;
+    private final Object[] _reusableOperandHolder;
+
+    public FunctionOperands(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
+      // iteratively resolve child operands.
+      List<RexExpression> operandExpressions = functionCall.getFunctionOperands();
+      _childOperandList = new ArrayList<>(operandExpressions.size());
+      for (RexExpression childRexExpression : operandExpressions) {
+        _childOperandList.add(toFunctionOperands(childRexExpression, dataSchema));
+      }
+      FunctionInfo functionInfo = FunctionRegistry.getFunctionInfo(
+          OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName()), operandExpressions.size());
+      Preconditions.checkNotNull(functionInfo, "Cannot find function with Name: "
+          + functionCall.getFunctionName());
+      _functionInvoker = new FunctionInvoker(functionInfo);
+      _resultName = computeColumnName(functionCall.getFunctionName(), _childOperandList);
+      _resultType = FunctionUtils.getColumnDataType(_functionInvoker.getResultClass());
+      _reusableOperandHolder = new Object[operandExpressions.size()];
+    }
+
+    @Override
+    public Object apply(Object[] row) {
+      for (int i = 0; i < _childOperandList.size(); i++) {
+        _reusableOperandHolder[i] = _childOperandList.get(i).apply(row);
+      }
+      return _functionInvoker.invoke(_reusableOperandHolder);
+    }
+
+    private static String computeColumnName(String functionName, List<TransformOperands> childOperands) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(functionName);
+      sb.append("(");
+      for (TransformOperands operands : childOperands) {
+        sb.append(operands.getResultName());
+        sb.append(",");
+      }
+      sb.append(")");
+      return sb.toString();
+    }
+  }
+
+  private static class ReferenceOperands extends TransformOperands {
+    private final int _refIndex;
+
+    public ReferenceOperands(RexExpression.InputRef inputRef, DataSchema dataSchema) {
+      _refIndex = inputRef.getIndex();
+      _resultType = dataSchema.getColumnDataType(_refIndex);
+      _resultName = dataSchema.getColumnName(_refIndex);
+    }
+
+    @Override
+    public Object apply(Object[] row) {
+      return row[_refIndex];
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index b409a4b063..ae75dd158b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -182,8 +182,25 @@ public class QueryRunnerTest {
         // Aggregation with multiple group key
         new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1, a.col2", 5},
 
-        // Aggregation without group by
+        // Aggregation without GROUP BY
         new Object[]{"SELECT COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'", 1},
+
+        // project in intermediate stage
+        // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1),
+        // col1 on both are "foo", "bar", "alice", "bob", "charlie"
+        // col2 on both are "foo", "bar", "alice", "foo", "bar",
+        //   filtered at :    ^                      ^
+        // thus the final JOIN result will have 6 rows: 3 "foo" <-> "foo"; and 3 "bob" <-> "bob"
+        new Object[]{"SELECT a.col1, a.col2, a.ts, b.col1, b.col3 FROM a JOIN b ON a.col1 = b.col2 "
+            + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 6},
+
+        // Making transform after JOIN, number of rows should be the same as JOIN result.
+        new Object[]{"SELECT a.col1, a.ts, a.col3 - b.col3 FROM a JOIN b ON a.col1 = b.col2 "
+            + " WHERE a.col3 >= 0 AND b.col3 >= 0", 15},
+
+        // Making transform after GROUP-BY, number of rows should be the same as GROUP-BY result.
+        new Object[]{"SELECT a.col1, a.col2, SUM(a.col3) - MIN(a.col3) FROM a"
+            + " WHERE a.col3 >= 0 GROUP BY a.col1, a.col2", 5},
     };
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org