You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/07/01 04:26:43 UTC
[pinot] branch master updated: add project/transform operator to multistage engine (#8967)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d4a64e3853 add project/transform operator to multistage engine (#8967)
d4a64e3853 is described below
commit d4a64e3853102b220cde905f79eaede0054bc43b
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Thu Jun 30 21:26:35 2022 -0700
add project/transform operator to multistage engine (#8967)
* initial to add transform operator
- only reference is suppoted, no call yet
- should work with any transform
* also add transform operator with function call
* fix function resolution
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../pinot/query/planner/logical/StagePlanner.java | 1 +
.../runtime/executor/WorkerQueryExecutor.java | 5 +-
.../query/runtime/operator/AggregateOperator.java | 3 +
.../query/runtime/operator/OperatorUtils.java | 67 ++++++++
.../query/runtime/operator/TransformOperator.java | 185 +++++++++++++++++++++
.../pinot/query/runtime/QueryRunnerTest.java | 19 ++-
6 files changed, 278 insertions(+), 2 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 02c2e7fb28..cf0b218708 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -95,6 +95,7 @@ public class StagePlanner {
}
// non-threadsafe
+ // TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
private StageNode walkRelPlan(RelNode node, int currentStageId) {
if (isExchangeNode(node)) {
// 1. exchangeNode always have only one input, get its input converted as a new stage root.
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 379d7bdbd6..009ebcdc3e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -43,6 +43,7 @@ import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.TransformOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
@@ -127,7 +128,9 @@ public class WorkerQueryExecutor {
} else if (stageNode instanceof FilterNode) {
throw new UnsupportedOperationException("Unsupported!");
} else if (stageNode instanceof ProjectNode) {
- throw new UnsupportedOperationException("Unsupported!");
+ ProjectNode projectNode = (ProjectNode) stageNode;
+ return new TransformOperator(getOperator(requestId, projectNode.getInputs().get(0), metadataMap),
+ projectNode.getProjects(), projectNode.getInputs().get(0).getDataSchema());
} else {
throw new UnsupportedOperationException(
String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 95a5dcd03f..8e2c7ef77a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -159,6 +159,7 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
switch (((RexExpression.FunctionCall) aggCall).getFunctionName()) {
case "$SUM":
case "$SUM0":
+ case "SUM":
return new SumAggregationFunction(
ExpressionContext.forIdentifier(
((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString()));
@@ -167,11 +168,13 @@ public class AggregateOperator extends BaseOperator<TransferableBlock> {
return new CountAggregationFunction();
case "$MIN":
case "$MIN0":
+ case "MIN":
return new MinAggregationFunction(
ExpressionContext.forIdentifier(
((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString()));
case "$MAX":
case "$MAX0":
+ case "MAX":
return new MaxAggregationFunction(
ExpressionContext.forIdentifier(
((RexExpression.FunctionCall) aggCall).getFunctionOperands().get(0).toString()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
new file mode 100644
index 0000000000..10974f4002
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class OperatorUtils {
+
+ private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new HashMap<>();
+
+ static {
+ OPERATOR_TOKEN_MAPPING.put("=", "EQ");
+ OPERATOR_TOKEN_MAPPING.put(">", "GT");
+ OPERATOR_TOKEN_MAPPING.put("<", "LT");
+ OPERATOR_TOKEN_MAPPING.put("?", "HOOK");
+ OPERATOR_TOKEN_MAPPING.put(":", "COLON");
+ OPERATOR_TOKEN_MAPPING.put("<=", "LE");
+ OPERATOR_TOKEN_MAPPING.put(">=", "GE");
+ OPERATOR_TOKEN_MAPPING.put("<>", "NE");
+ OPERATOR_TOKEN_MAPPING.put("!=", "NE2");
+ OPERATOR_TOKEN_MAPPING.put("+", "PLUS");
+ OPERATOR_TOKEN_MAPPING.put("-", "MINUS");
+ OPERATOR_TOKEN_MAPPING.put("*", "STAR");
+ OPERATOR_TOKEN_MAPPING.put("/", "DIVIDE");
+ OPERATOR_TOKEN_MAPPING.put("%", "PERCENT_REMAINDER");
+ OPERATOR_TOKEN_MAPPING.put("||", "CONCAT");
+ OPERATOR_TOKEN_MAPPING.put("=>", "NAMED_ARGUMENT_ASSIGNMENT");
+ OPERATOR_TOKEN_MAPPING.put("..", "DOUBLE_PERIOD");
+ OPERATOR_TOKEN_MAPPING.put("'", "QUOTE");
+ OPERATOR_TOKEN_MAPPING.put("\"", "DOUBLE_QUOTE");
+ OPERATOR_TOKEN_MAPPING.put("|", "VERTICAL_BAR");
+ OPERATOR_TOKEN_MAPPING.put("^", "CARET");
+ OPERATOR_TOKEN_MAPPING.put("$", "DOLLAR");
+ }
+
+ private OperatorUtils() {
+ // do not instantiate.
+ }
+
+ /**
+ * Canonicalize function name since Logical plan uses Parser.jj extracted tokens.
+ * @param functionName input Function name
+ * @return Canonicalize form of the input function name
+ */
+ public static String canonicalizeFunctionName(String functionName) {
+ functionName = OPERATOR_TOKEN_MAPPING.getOrDefault(functionName, functionName);
+ return functionName;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
new file mode 100644
index 0000000000..6744e35d99
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.function.FunctionInfo;
+import org.apache.pinot.common.function.FunctionInvoker;
+import org.apache.pinot.common.function.FunctionRegistry;
+import org.apache.pinot.common.function.FunctionUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+/**
+ * This basic {@code TransformOperator} implement basic transformations.
+ */
+public class TransformOperator extends BaseOperator<TransferableBlock> {
+ private static final String EXPLAIN_NAME = "TRANSFORM";
+ private final BaseOperator<TransferableBlock> _upstreamOperator;
+ private final List<TransformOperands> _transformOperandsList;
+ private final int _resultColumnSize;
+ private final DataSchema _resultSchema;
+
+ public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator, List<RexExpression> transforms,
+ DataSchema upstreamDataSchema) {
+ _upstreamOperator = upstreamOperator;
+ _resultColumnSize = transforms.size();
+ _transformOperandsList = new ArrayList<>(_resultColumnSize);
+ for (RexExpression rexExpression : transforms) {
+ _transformOperandsList.add(TransformOperands.toFunctionOperands(rexExpression, upstreamDataSchema));
+ }
+ String[] columnNames = new String[_resultColumnSize];
+ DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[_resultColumnSize];
+ for (int i = 0; i < _resultColumnSize; i++) {
+ columnNames[i] = _transformOperandsList.get(i).getResultName();
+ columnDataTypes[i] = _transformOperandsList.get(i).getResultType();
+ }
+ _resultSchema = new DataSchema(columnNames, columnDataTypes);
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ // WorkerExecutor doesn't use getChildOperators, returns null here.
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ try {
+ return transform(_upstreamOperator.nextBlock());
+ } catch (Exception e) {
+ return TransferableBlockUtils.getErrorTransferableBlock(e);
+ }
+ }
+
+ private TransferableBlock transform(TransferableBlock block)
+ throws Exception {
+ if (TransferableBlockUtils.isEndOfStream(block)) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+ List<Object[]> resultRows = new ArrayList<>();
+ List<Object[]> container = block.getContainer();
+ for (Object[] row : container) {
+ Object[] resultRow = new Object[_resultColumnSize];
+ for (int i = 0; i < _resultColumnSize; i++) {
+ resultRow[i] = _transformOperandsList.get(i).apply(row);
+ }
+ resultRows.add(resultRow);
+ }
+ return new TransferableBlock(resultRows, _resultSchema, BaseDataBlock.Type.ROW);
+ }
+
+ private static abstract class TransformOperands {
+ protected String _resultName;
+ protected DataSchema.ColumnDataType _resultType;
+
+ public static TransformOperands toFunctionOperands(RexExpression rexExpression, DataSchema dataSchema) {
+ if (rexExpression instanceof RexExpression.InputRef) {
+ return new ReferenceOperands((RexExpression.InputRef) rexExpression, dataSchema);
+ } else if (rexExpression instanceof RexExpression.FunctionCall) {
+ return new FunctionOperands((RexExpression.FunctionCall) rexExpression, dataSchema);
+ } else {
+ throw new UnsupportedOperationException("Unsupported RexExpression: " + rexExpression);
+ }
+ }
+
+ public String getResultName() {
+ return _resultName;
+ }
+
+ public DataSchema.ColumnDataType getResultType() {
+ return _resultType;
+ }
+
+ public abstract Object apply(Object[] row);
+ }
+
+ private static class FunctionOperands extends TransformOperands {
+ private final List<TransformOperands> _childOperandList;
+ private final FunctionInvoker _functionInvoker;
+ private final Object[] _reusableOperandHolder;
+
+ public FunctionOperands(RexExpression.FunctionCall functionCall, DataSchema dataSchema) {
+ // iteratively resolve child operands.
+ List<RexExpression> operandExpressions = functionCall.getFunctionOperands();
+ _childOperandList = new ArrayList<>(operandExpressions.size());
+ for (RexExpression childRexExpression : operandExpressions) {
+ _childOperandList.add(toFunctionOperands(childRexExpression, dataSchema));
+ }
+ FunctionInfo functionInfo = FunctionRegistry.getFunctionInfo(
+ OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName()), operandExpressions.size());
+ Preconditions.checkNotNull(functionInfo, "Cannot find function with Name: "
+ + functionCall.getFunctionName());
+ _functionInvoker = new FunctionInvoker(functionInfo);
+ _resultName = computeColumnName(functionCall.getFunctionName(), _childOperandList);
+ _resultType = FunctionUtils.getColumnDataType(_functionInvoker.getResultClass());
+ _reusableOperandHolder = new Object[operandExpressions.size()];
+ }
+
+ @Override
+ public Object apply(Object[] row) {
+ for (int i = 0; i < _childOperandList.size(); i++) {
+ _reusableOperandHolder[i] = _childOperandList.get(i).apply(row);
+ }
+ return _functionInvoker.invoke(_reusableOperandHolder);
+ }
+
+ private static String computeColumnName(String functionName, List<TransformOperands> childOperands) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(functionName);
+ sb.append("(");
+ for (TransformOperands operands : childOperands) {
+ sb.append(operands.getResultName());
+ sb.append(",");
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+ }
+
+ private static class ReferenceOperands extends TransformOperands {
+ private final int _refIndex;
+
+ public ReferenceOperands(RexExpression.InputRef inputRef, DataSchema dataSchema) {
+ _refIndex = inputRef.getIndex();
+ _resultType = dataSchema.getColumnDataType(_refIndex);
+ _resultName = dataSchema.getColumnName(_refIndex);
+ }
+
+ @Override
+ public Object apply(Object[] row) {
+ return row[_refIndex];
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index b409a4b063..ae75dd158b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -182,8 +182,25 @@ public class QueryRunnerTest {
// Aggregation with multiple group key
new Object[]{"SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1, a.col2", 5},
- // Aggregation without group by
+ // Aggregation without GROUP BY
new Object[]{"SELECT COUNT(*) FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'", 1},
+
+ // project in intermediate stage
+ // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1),
+ // col1 on both are "foo", "bar", "alice", "bob", "charlie"
+ // col2 on both are "foo", "bar", "alice", "foo", "bar",
+ // filtered at : ^ ^
+ // thus the final JOIN result will have 6 rows: 3 "foo" <-> "foo"; and 3 "bob" <-> "bob"
+ new Object[]{"SELECT a.col1, a.col2, a.ts, b.col1, b.col3 FROM a JOIN b ON a.col1 = b.col2 "
+ + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 6},
+
+ // Making transform after JOIN, number of rows should be the same as JOIN result.
+ new Object[]{"SELECT a.col1, a.ts, a.col3 - b.col3 FROM a JOIN b ON a.col1 = b.col2 "
+ + " WHERE a.col3 >= 0 AND b.col3 >= 0", 15},
+
+ // Making transform after GROUP-BY, number of rows should be the same as GROUP-BY result.
+ new Object[]{"SELECT a.col1, a.col2, SUM(a.col3) - MIN(a.col3) FROM a"
+ + " WHERE a.col3 >= 0 GROUP BY a.col1, a.col2", 5},
};
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org