You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/16 15:51:42 UTC

[GitHub] [pinot] walterddr opened a new pull request, #8558: add support for project/filter pushdown

walterddr opened a new pull request, #8558:
URL: https://github.com/apache/pinot/pull/8558

   - Adding Projection/FilterNode
   - Adding Projection/FilterNode conversion logic
   - Adding pushdown rules
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r854619137


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java:
##########
@@ -98,22 +100,27 @@ public void runJob() {
   private BaseOperator<DataTableBlock> getOperator(long requestId, StageNode stageNode,
       Map<Integer, StageMetadata> metadataMap) {
     // TODO: optimize this into a framework. (physical planner)
-    if (stageNode instanceof MailboxSendNode) {
-      MailboxSendNode sendNode = (MailboxSendNode) stageNode;
-      BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
-      StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
-      return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(),
-          sendNode.getExchangeType(), _hostName, _port, requestId, sendNode.getStageId());
-    } else if (stageNode instanceof MailboxReceiveNode) {
+    if (stageNode instanceof MailboxReceiveNode) {
       MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
       List<ServerInstance> sendingInstances = metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
       return new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, _hostName, _port,
           requestId, receiveNode.getSenderStageId());
+    } else if (stageNode instanceof MailboxSendNode) {
+      MailboxSendNode sendNode = (MailboxSendNode) stageNode;
+      BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
+      StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
+      return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(),
+          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostName, _port, requestId,
+          sendNode.getStageId());
     } else if (stageNode instanceof JoinNode) {
       JoinNode joinNode = (JoinNode) stageNode;
       BaseOperator<DataTableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
       BaseOperator<DataTableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
       return new BroadcastJoinOperator(leftOperator, rightOperator, joinNode.getCriteria());

Review Comment:
   technically this should be renamed as `HashJoinOperator` since it has nothing to do with the distribute method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855433820


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/RexExpression.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+
+/**
+ * {@code RexExpression} is the serializable format of the {@link RexNode}.
+ */
+public abstract class RexExpression {

Review Comment:
   Also why is this abstract ? The `RexNode` abstraction provided by Calcite should be enough and here convert `RexNode` to `RexExpression` (internal) via static methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855377495


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java:
##########
@@ -46,10 +45,10 @@
 
 
 /**
- * Calcite parser to convert SQL expressions into {@link Expression}.
+ * This class provide API to parse a SQL string into Pinot query {@link SqlNode}.

Review Comment:
   I feel this class should ideally be deleted. It will get difficult to maintain parity between this and existing CalciteSqlParser especially because both work on SQL -> SqlNode -> PinotQuery



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855386729


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/FilterNode.java:
##########
@@ -18,21 +18,25 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
 
 
-public class CalcNode extends AbstractStageNode {
-  private String _expression;
+public class FilterNode extends AbstractStageNode {
+  @ProtoProperties
+  private RexExpression _condition;
 
-  public CalcNode(int stageId) {
-    super(stageId);
+  public FilterNode(int stageId) {
+    super(stageId, null);

Review Comment:
   I think `rowType` should be known and non-null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855374289


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java:
##########
@@ -67,11 +68,12 @@ public QueryPlan makePlan(RelNode relRoot) {
     StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
 
     // global root needs to send results back to the ROOT, a.k.a. the client response node.
-    // the last stage is always a broadcast-gather.
+    // the last stage only has one receiver so doesn't matter what the exchange type is.

Review Comment:
   I agree that the last exchange at the top (between sender(s) in 1st stage and single receiver in 0th stage) can be anything. May be RelDistribution.Type.ANY is more suitable ?
   
   Slightly related question - I think exchange type information is enough to decide the kind of sender (like right now it will be broadcast, later it could be shuffle / hash partition exchange). How do we decide the kind of exact receiver ? Especially for the receiver in the root stage, we ideally want to know if the receiver is unordered or ordered for simple v/s sorted merge respectively ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855460689


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -142,6 +152,13 @@ protected DataTableBlock getNextBlock() {
           }
           break;
         case HASH_DISTRIBUTED:
+          // TODO: ensure that server instance list is sorted using same function in sender.

Review Comment:
   Do we have tests for each of the 3 distribution types ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855395674


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java:
##########
@@ -95,12 +97,15 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
     if (isExchangeNode(node)) {
       // 1. exchangeNode always have only one input, get its input converted as a new stage root.
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
-      RelDistribution.Type exchangeType = ((LogicalExchange) node).distribution.getType();
+      RelDistribution distribution = ((LogicalExchange) node).getDistribution();
+      RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(),
+      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, node.getRowType(), nextStageRoot.getStageId(),
           exchangeType);
+      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), node.getRowType(),
+          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distribution.getKeys().get(0)) : null);

Review Comment:
   yes totally agree on making this decision dictated by statistics / optimizer etc. My point was more on if we are hardcoding the exchange type in the initial implementation, it can be broadcast as I feel it is simpler to debug / get right initially may be



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855446659


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/RexExpression.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+
+/**
+ * {@code RexExpression} is the serializable format of the {@link RexNode}.
+ */
+public abstract class RexExpression {
+  @ProtoProperties
+  protected SqlKind _sqlKind;
+  @ProtoProperties
+  protected RelDataType _dataType;
+
+  public SqlKind getKind() {
+    return _sqlKind;
+  }
+
+  public RelDataType getDataType() {
+    return _dataType;
+  }
+
+  public static RexExpression toRexExpression(RexNode rexNode) {
+    if (rexNode instanceof RexInputRef) {
+      return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex());
+    } else if (rexNode instanceof RexLiteral) {
+      RexLiteral rexLiteral = ((RexLiteral) rexNode);
+      return new RexExpression.Literal(rexLiteral.getType(), rexLiteral.getTypeName(), rexLiteral.getValue());
+    } else if (rexNode instanceof RexCall) {
+      RexCall rexCall = (RexCall) rexNode;
+      List<RexExpression> operands = rexCall.getOperands().stream().map(RexExpression::toRexExpression)
+          .collect(Collectors.toList());
+      return new RexExpression.FunctionCall(rexCall.getKind(), rexCall.getType(), rexCall.getOperator().getName(),
+          operands);
+    } else {
+      throw new IllegalArgumentException("Unsupported RexNode type with SqlKind: " + rexNode.getKind());
+    }
+  }
+
+  private static Comparable convertLiteral(Comparable value, SqlTypeName sqlTypeName, RelDataType dataType) {
+    switch (sqlTypeName) {
+      case BOOLEAN:
+        return (boolean) value;
+      case DECIMAL:

Review Comment:
   The docs of `RexLiteral` don't seem to suggest that underlying SqlTypeName for numeric will be wrapped inside DECIMAL
   
   https://calcite.apache.org/javadocAggregate/org/apache/calcite/rex/RexLiteral.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855381193


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java:
##########
@@ -20,18 +20,25 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
 import org.apache.pinot.query.planner.nodes.serde.ProtoSerializable;
 import org.apache.pinot.query.planner.nodes.serde.ProtoSerializationUtils;
 
 
 public abstract class AbstractStageNode implements StageNode, ProtoSerializable {
 
+  @ProtoProperties
   protected final int _stageId;
+  @ProtoProperties
   protected final List<StageNode> _inputs;
+  @ProtoProperties
+  protected RelDataType _rowType;
 
-  public AbstractStageNode(int stageId) {
+  public AbstractStageNode(int stageId, RelDataType rowType) {
     _stageId = stageId;
+    _rowType = rowType;

Review Comment:
   Assert for non-null ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855382139


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java:
##########
@@ -18,21 +18,35 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 public class MailboxSendNode extends AbstractStageNode {
+  @ProtoProperties
   private int _receiverStageId;
+  @ProtoProperties
   private RelDistribution.Type _exchangeType;
+  @ProtoProperties
+  private KeySelector<Object[], Object> _partitionKeySelector;
 
   public MailboxSendNode(int stageId) {
-    super(stageId);
+    super(stageId, null);
   }
 
-  public MailboxSendNode(int stageId, int receiverStageId, RelDistribution.Type exchangeType) {
-    super(stageId);
+  public MailboxSendNode(int stageId, RelDataType rowType, int receiverStageId, RelDistribution.Type exchangeType) {
+    this(stageId, rowType, receiverStageId, exchangeType, null);
+  }
+
+  public MailboxSendNode(int stageId, RelDataType rowType, int receiverStageId, RelDistribution.Type exchangeType,
+      @Nullable KeySelector<Object[], Object> partitionKeySelector) {
+    super(stageId, rowType);
     _receiverStageId = receiverStageId;
     _exchangeType = exchangeType;
+    _partitionKeySelector = partitionKeySelector;

Review Comment:
   This should be non-null only if exchange type is hash based ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia merged pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia merged PR #8558:
URL: https://github.com/apache/pinot/pull/8558


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8558:
URL: https://github.com/apache/pinot/pull/8558#issuecomment-1100700442

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8558?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > :exclamation: No coverage uploaded for pull request base (`multi_stage_query_engine@791248a`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   ```diff
   @@                     Coverage Diff                     @@
   ##             multi_stage_query_engine    #8558   +/-   ##
   ===========================================================
     Coverage                            ?   67.03%           
     Complexity                          ?     4198           
   ===========================================================
     Files                               ?     1262           
     Lines                               ?    63787           
     Branches                            ?    10023           
   ===========================================================
     Hits                                ?    42760           
     Misses                              ?    17951           
     Partials                            ?     3076           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests1 | `67.03% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8558?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8558?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [791248a...841e732](https://codecov.io/gh/apache/pinot/pull/8558?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855472401


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.parser;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.query.planner.nodes.RexExpression;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Calcite parser to convert SQL expressions into {@link Expression}.
+ *
+ * <p>This class is extracted from {@link org.apache.pinot.sql.parsers.CalciteSqlParser}. It contains the logic
+ * to parsed {@link org.apache.calcite.rex.RexNode}, in the format of {@link RexExpression} and convert them into
+ * Thrift {@link Expression} format.
+ */
+public class CalciteRexExpressionParser {

Review Comment:
   Let's discuss this a bit. I am not sure why we need this (unless we are planning to use the RelDataType information coming from calcite row expression nodes) and how this will be plugged into the planning stage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855405640


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java:
##########
@@ -98,22 +100,27 @@ public void runJob() {
   private BaseOperator<DataTableBlock> getOperator(long requestId, StageNode stageNode,
       Map<Integer, StageMetadata> metadataMap) {
     // TODO: optimize this into a framework. (physical planner)
-    if (stageNode instanceof MailboxSendNode) {
-      MailboxSendNode sendNode = (MailboxSendNode) stageNode;
-      BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
-      StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
-      return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(),
-          sendNode.getExchangeType(), _hostName, _port, requestId, sendNode.getStageId());
-    } else if (stageNode instanceof MailboxReceiveNode) {
+    if (stageNode instanceof MailboxReceiveNode) {
       MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
       List<ServerInstance> sendingInstances = metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
       return new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, _hostName, _port,
           requestId, receiveNode.getSenderStageId());
+    } else if (stageNode instanceof MailboxSendNode) {
+      MailboxSendNode sendNode = (MailboxSendNode) stageNode;
+      BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
+      StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
+      return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(),
+          sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostName, _port, requestId,
+          sendNode.getStageId());
     } else if (stageNode instanceof JoinNode) {
       JoinNode joinNode = (JoinNode) stageNode;
       BaseOperator<DataTableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
       BaseOperator<DataTableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
       return new BroadcastJoinOperator(leftOperator, rightOperator, joinNode.getCriteria());

Review Comment:
   +1 - it is a physical relational operator for executing join using hash. A nested loop join can also be implemented in the future and the distribution strategy could still possibly be broadcast



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855385322


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java:
##########
@@ -95,12 +97,15 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
     if (isExchangeNode(node)) {
       // 1. exchangeNode always have only one input, get its input converted as a new stage root.
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
-      RelDistribution.Type exchangeType = ((LogicalExchange) node).distribution.getType();
+      RelDistribution distribution = ((LogicalExchange) node).getDistribution();
+      RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(),
+      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, node.getRowType(), nextStageRoot.getStageId(),
           exchangeType);
+      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), node.getRowType(),
+          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distribution.getKeys().get(0)) : null);

Review Comment:
   yes. ideally, we should take `SqlHints` to decide whether we want to use broadcast or distributed hash sender. i can default enable the broadcast or hash. and keep the other method conditional
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855750062


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java:
##########
@@ -57,12 +63,26 @@ public boolean matches(RelOptRuleCall call) {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
+    // TODO: this only works for single equality JOIN. add generic condition parser
     Join join = call.rel(0);
     RelNode leftInput = join.getInput(0);
     RelNode rightInput = join.getInput(1);
 
-    RelNode leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
-    RelNode rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
+    RelNode leftExchange;
+    RelNode rightExchange;
+    List<RelHint> hints = join.getHints();
+    if (hints.contains(PinotRelationalHints.USE_HASH_JOIN)) {

Review Comment:
   renamed. sorry for the confusion this should be `USE_HASH_DISTRIBUTE`. has nothing to do with the join type



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java:
##########
@@ -57,12 +63,26 @@ public boolean matches(RelOptRuleCall call) {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
+    // TODO: this only works for single equality JOIN. add generic condition parser
     Join join = call.rel(0);
     RelNode leftInput = join.getInput(0);
     RelNode rightInput = join.getInput(1);
 
-    RelNode leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
-    RelNode rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
+    RelNode leftExchange;
+    RelNode rightExchange;
+    List<RelHint> hints = join.getHints();
+    if (hints.contains(PinotRelationalHints.USE_HASH_JOIN)) {

Review Comment:
   renamed. sorry for the confusion this should be `USE_HASH_DISTRIBUTE`. it is not meant to hint the join algorithm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855451443


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/RexExpression.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+
+/**
+ * {@code RexExpression} is the serializable format of the {@link RexNode}.
+ */
+public abstract class RexExpression {
+  @ProtoProperties
+  protected SqlKind _sqlKind;
+  @ProtoProperties
+  protected RelDataType _dataType;
+
+  public SqlKind getKind() {
+    return _sqlKind;
+  }
+
+  public RelDataType getDataType() {
+    return _dataType;
+  }
+
+  public static RexExpression toRexExpression(RexNode rexNode) {
+    if (rexNode instanceof RexInputRef) {
+      return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex());
+    } else if (rexNode instanceof RexLiteral) {
+      RexLiteral rexLiteral = ((RexLiteral) rexNode);
+      return new RexExpression.Literal(rexLiteral.getType(), rexLiteral.getTypeName(), rexLiteral.getValue());
+    } else if (rexNode instanceof RexCall) {
+      RexCall rexCall = (RexCall) rexNode;
+      List<RexExpression> operands = rexCall.getOperands().stream().map(RexExpression::toRexExpression)
+          .collect(Collectors.toList());
+      return new RexExpression.FunctionCall(rexCall.getKind(), rexCall.getType(), rexCall.getOperator().getName(),
+          operands);
+    } else {
+      throw new IllegalArgumentException("Unsupported RexNode type with SqlKind: " + rexNode.getKind());
+    }
+  }
+
+  private static Comparable convertLiteral(Comparable value, SqlTypeName sqlTypeName, RelDataType dataType) {
+    switch (sqlTypeName) {
+      case BOOLEAN:
+        return (boolean) value;
+      case DECIMAL:
+        switch (dataType.getSqlTypeName()) {
+          case INTEGER:
+            return ((BigDecimal) value).intValue();
+          case BIGINT:
+            return ((BigDecimal) value).longValue();
+          case FLOAT:
+            return ((BigDecimal) value).floatValue();
+          case DOUBLE:
+          default:
+            return ((BigDecimal) value).doubleValue();
+        }
+      case CHAR:
+        switch (dataType.getSqlTypeName()) {
+          case VARCHAR:
+            return ((NlsString) value).getValue();
+          default:
+            return value;
+        }
+      default:
+        return value;
+    }
+  }
+
+  public static class InputRef extends RexExpression {

Review Comment:
   I think this is the only one that is currently not modeled as part of our existing thrift `Expression`. It already supports Identifier, Literal and Function. So, can we not reuse `Literal` and `Function` below instead of modeling the same as `RexExpression` because eventually `RexExpression` has to be converted to thrift Expression much like SqlNode to PinotQuery ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855380265


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java:
##########
@@ -20,18 +20,25 @@
 

Review Comment:
   (nit) shall we rename this package to `pinot.query.planner.nodes.logical` or just `pinot.query.planner.logical`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855611429


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/RexExpression.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+
+/**
+ * {@code RexExpression} is the serializable format of the {@link RexNode}.
+ */
+public abstract class RexExpression {
+  @ProtoProperties
+  protected SqlKind _sqlKind;
+  @ProtoProperties
+  protected RelDataType _dataType;
+
+  public SqlKind getKind() {
+    return _sqlKind;
+  }
+
+  public RelDataType getDataType() {
+    return _dataType;
+  }
+
+  public static RexExpression toRexExpression(RexNode rexNode) {
+    if (rexNode instanceof RexInputRef) {
+      return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex());
+    } else if (rexNode instanceof RexLiteral) {
+      RexLiteral rexLiteral = ((RexLiteral) rexNode);
+      return new RexExpression.Literal(rexLiteral.getType(), rexLiteral.getTypeName(), rexLiteral.getValue());
+    } else if (rexNode instanceof RexCall) {
+      RexCall rexCall = (RexCall) rexNode;
+      List<RexExpression> operands = rexCall.getOperands().stream().map(RexExpression::toRexExpression)
+          .collect(Collectors.toList());
+      return new RexExpression.FunctionCall(rexCall.getKind(), rexCall.getType(), rexCall.getOperator().getName(),
+          operands);
+    } else {
+      throw new IllegalArgumentException("Unsupported RexNode type with SqlKind: " + rexNode.getKind());
+    }
+  }
+
+  private static Comparable convertLiteral(Comparable value, SqlTypeName sqlTypeName, RelDataType dataType) {
+    switch (sqlTypeName) {
+      case BOOLEAN:
+        return (boolean) value;
+      case DECIMAL:

Review Comment:
   its actually the value that matters. we need to convert the list of Literal value wrap to pinot specific types. 
   there's addition things that needs to be addressed, for example a `IN ('a', 'b')` now is encoded as a Sarg('a', 'b') where each string is actually an NlsString. so there are more things to deal with. 
   
   will follow up with a separate PR on the literal conversion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855469081


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java:
##########
@@ -96,28 +85,19 @@ public void testQueryToStages()
   }
 
   @Test
-  public void testQueryToRel()
-      throws Exception {
-    PlannerContext plannerContext = new PlannerContext();
-    String query = "SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0";
-    SqlNode parsed = _queryEnvironment.parse(query, plannerContext);
-    SqlNode validated = _queryEnvironment.validate(parsed);
-    RelRoot relRoot = _queryEnvironment.toRelation(validated, plannerContext);
-    RelNode optimized = _queryEnvironment.optimize(relRoot, plannerContext);
-
-    // Assert that relational plan can be written into a ALL-ATTRIBUTE digest.
-    StringWriter sw = new StringWriter();
-    PrintWriter pw = new PrintWriter(sw);
-    RelWriter planWriter = new RelXmlWriter(pw, SqlExplainLevel.ALL_ATTRIBUTES);
-    optimized.explain(planWriter);
-    Assert.assertNotNull(sw.toString());
+  public void testQueryProjectFilterPushdownForJoin() {
+    String query = "SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON a.col1 = b.col2 "
+        + "WHERE a.col3 >= 0 AND a.col2 IN  ('a', 'b') AND b.col3 < 0";
+    QueryPlan queryPlan = _queryEnvironment.planQuery(query);

Review Comment:
   Can we actually verify the multi-stage tree and also the logical RelNode tree to make sure all incremental stages of planning are tested ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855479255


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java:
##########
@@ -57,12 +60,21 @@ public boolean matches(RelOptRuleCall call) {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
+    // TODO: this only works for single equality JOIN. add generic condition parser
     Join join = call.rel(0);
     RelNode leftInput = join.getInput(0);
     RelNode rightInput = join.getInput(1);
 
-    RelNode leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
-    RelNode rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
+    // TODO: this currently has a hard-coded exchange type. add exchange selection option
+//    RelNode lExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
+//    RelNode rExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
+    int leftOperandIndex = ((RexInputRef) ((RexCall) join.getCondition()).getOperands().get(0)).getIndex();
+    int rightOperandIndex = ((RexInputRef) ((RexCall) join.getCondition()).getOperands().get(1)).getIndex()
+        - join.getLeft().getRowType().getFieldNames().size();
+    RelNode leftExchange = LogicalExchange.create(leftInput,
+        RelDistributions.hash(Collections.singletonList(leftOperandIndex)));
+    RelNode rightExchange = LogicalExchange.create(rightInput,
+        RelDistributions.hash(Collections.singletonList(rightOperandIndex)));

Review Comment:
   add a placeholder to use SqlHint to determine whether to use broadcast or hash distribution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855387615


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java:
##########
@@ -18,21 +18,35 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 public class MailboxSendNode extends AbstractStageNode {
+  @ProtoProperties
   private int _receiverStageId;
+  @ProtoProperties
   private RelDistribution.Type _exchangeType;
+  @ProtoProperties
+  private KeySelector<Object[], Object> _partitionKeySelector;
 
   public MailboxSendNode(int stageId) {
-    super(stageId);
+    super(stageId, null);

Review Comment:
   Is this null because we don't know upfront at the time of stage plan creation ? I think this information should be available ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855391781


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/RexExpression.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+
+/**
+ * {@code RexExpression} is the serializable format of the {@link RexNode}.
+ */
+public abstract class RexExpression {

Review Comment:
   This is another reason why I was suggesting to rename this package to `query.planner.logical` since we have more than pinot `nodes` here. Other code related to logical planning is also part of it. Can be done in a separate PR btw if we decide to do it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855377344


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java:
##########
@@ -24,16 +24,18 @@
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.logical.LogicalCalc;

Review Comment:
   Is `LogicalCalc` even useful ? It is essentially same functionality provided together by `LogicalProject` and `LogicalFilter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855443180


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/RexExpression.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+
+/**
+ * {@code RexExpression} is the serializable format of the {@link RexNode}.
+ */
+public abstract class RexExpression {
+  @ProtoProperties
+  protected SqlKind _sqlKind;
+  @ProtoProperties
+  protected RelDataType _dataType;
+
+  public SqlKind getKind() {
+    return _sqlKind;
+  }
+
+  public RelDataType getDataType() {
+    return _dataType;
+  }
+
+  public static RexExpression toRexExpression(RexNode rexNode) {
+    if (rexNode instanceof RexInputRef) {
+      return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex());
+    } else if (rexNode instanceof RexLiteral) {
+      RexLiteral rexLiteral = ((RexLiteral) rexNode);
+      return new RexExpression.Literal(rexLiteral.getType(), rexLiteral.getTypeName(), rexLiteral.getValue());
+    } else if (rexNode instanceof RexCall) {
+      RexCall rexCall = (RexCall) rexNode;
+      List<RexExpression> operands = rexCall.getOperands().stream().map(RexExpression::toRexExpression)
+          .collect(Collectors.toList());
+      return new RexExpression.FunctionCall(rexCall.getKind(), rexCall.getType(), rexCall.getOperator().getName(),
+          operands);
+    } else {
+      throw new IllegalArgumentException("Unsupported RexNode type with SqlKind: " + rexNode.getKind());

Review Comment:
   Use entire `RexNode` in the message instead of `SqlKind` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855374289


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java:
##########
@@ -67,11 +68,12 @@ public QueryPlan makePlan(RelNode relRoot) {
     StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
 
     // global root needs to send results back to the ROOT, a.k.a. the client response node.
-    // the last stage is always a broadcast-gather.
+    // the last stage only has one receiver so doesn't matter what the exchange type is.

Review Comment:
   I agree that the last exchange at the top (between sender(s) in 1st stage and single receiver in 0th stage) can be anything. May be `RelDistribution.Type.ANY` is more suitable ?
   
   Slightly related question - I think exchange type information is enough to decide the kind of sender (like right now it will be broadcast, later it could be shuffle / hash partition etc). 
   
   How do we decide the kind of exact receiver ? Especially for the receiver in the root stage, we ideally want to know if the receiver is unordered or ordered for simple v/s sorted merge respectively ?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java:
##########
@@ -95,12 +97,15 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
     if (isExchangeNode(node)) {
       // 1. exchangeNode always have only one input, get its input converted as a new stage root.
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
-      RelDistribution.Type exchangeType = ((LogicalExchange) node).distribution.getType();
+      RelDistribution distribution = ((LogicalExchange) node).getDistribution();
+      RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), mailboxReceiver.getStageId(),
+      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, node.getRowType(), nextStageRoot.getStageId(),
           exchangeType);
+      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), node.getRowType(),
+          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distribution.getKeys().get(0)) : null);

Review Comment:
   I thought initially we are going to support only broadcast based distribution / exchange ?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java:
##########
@@ -24,16 +24,18 @@
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.logical.LogicalCalc;

Review Comment:
   Is LogicalCalc even useful ? It is essentially same functionality provided together by LogicalProject and LogicalFilter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855385678


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteExpressionParser.java:
##########
@@ -46,10 +45,10 @@
 
 
 /**
- * Calcite parser to convert SQL expressions into {@link Expression}.
+ * This class provide API to parse a SQL string into Pinot query {@link SqlNode}.

Review Comment:
   sounds good. it is not used. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855386029


##########
pinot-common/src/main/proto/plan.proto:
##########
@@ -76,8 +76,9 @@ message LiteralField {
     bool boolField = 1;
     int32 intField = 2;
     int64 longField = 3;
-    double doubleField = 4;
-    string stringField = 5;
+    float floatField = 4;

Review Comment:
   correct. this will be regenerated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855482922


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/planner/nodes/SerDeUtilsTest.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SerDeUtilsTest extends QueryEnvironmentTestBase {
+
+  @Test(dataProvider = "testQueryDataProvider")
+  public void testQueryToRel(String query)

Review Comment:
   ```suggestion
     public void testQueryStagePlanSerDe(String query)
   ```



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java:
##########
@@ -20,18 +20,25 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
 import org.apache.pinot.query.planner.nodes.serde.ProtoSerializable;
 import org.apache.pinot.query.planner.nodes.serde.ProtoSerializationUtils;
 
 
 public abstract class AbstractStageNode implements StageNode, ProtoSerializable {
 
+  @ProtoProperties
   protected final int _stageId;
+  @ProtoProperties
   protected final List<StageNode> _inputs;
+  @ProtoProperties
+  protected RelDataType _rowType;
 
-  public AbstractStageNode(int stageId) {
+  public AbstractStageNode(int stageId, RelDataType rowType) {

Review Comment:
   get rid of this constructor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855693573


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotExchangeNodeInsertRule.java:
##########
@@ -57,12 +63,26 @@ public boolean matches(RelOptRuleCall call) {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
+    // TODO: this only works for single equality JOIN. add generic condition parser
     Join join = call.rel(0);
     RelNode leftInput = join.getInput(0);
     RelNode rightInput = join.getInput(1);
 
-    RelNode leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
-    RelNode rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
+    RelNode leftExchange;
+    RelNode rightExchange;
+    List<RelHint> hints = join.getHints();
+    if (hints.contains(PinotRelationalHints.USE_HASH_JOIN)) {

Review Comment:
   Why is this if-else ? In the else part, we will still be doing hashjoin in the current implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855374289


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java:
##########
@@ -67,11 +68,12 @@ public QueryPlan makePlan(RelNode relRoot) {
     StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
 
     // global root needs to send results back to the ROOT, a.k.a. the client response node.
-    // the last stage is always a broadcast-gather.
+    // the last stage only has one receiver so doesn't matter what the exchange type is.

Review Comment:
   I agree that the last exchange at the top (between sender(s) in 1st stage and single receiver in 0th stage) can be anything. May be `RelDistribution.Type.ANY` is more suitable ?
   
   Slightly related question - I think exchange type information is enough to decide the kind of sender (like right now it will be broadcast, later it could be shuffle / hash partition exchange). How do we decide the kind of exact receiver ? Especially for the receiver in the root stage, we ideally want to know if the receiver is unordered or ordered for simple v/s sorted merge respectively ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855377875


##########
pinot-common/src/main/proto/plan.proto:
##########
@@ -76,8 +76,9 @@ message LiteralField {
     bool boolField = 1;
     int32 intField = 2;
     int64 longField = 3;
-    double doubleField = 4;
-    string stringField = 5;
+    float floatField = 4;

Review Comment:
   The code needs to be regenerated (hopefully it doesn't complain of reordering ?) but I guess it is gitignored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855387954


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/ProjectNode.java:
##########
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+
+
+public class ProjectNode extends AbstractStageNode {
+  @ProtoProperties
+  private List<RexExpression> _projects;
+
+  public ProjectNode(int stageId) {
+    super(stageId, null);

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855388522


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java:
##########
@@ -18,21 +18,35 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 public class MailboxSendNode extends AbstractStageNode {
+  @ProtoProperties
   private int _receiverStageId;
+  @ProtoProperties
   private RelDistribution.Type _exchangeType;
+  @ProtoProperties
+  private KeySelector<Object[], Object> _partitionKeySelector;
 
   public MailboxSendNode(int stageId) {
-    super(stageId);
+    super(stageId, null);
   }
 
-  public MailboxSendNode(int stageId, int receiverStageId, RelDistribution.Type exchangeType) {
-    super(stageId);
+  public MailboxSendNode(int stageId, RelDataType rowType, int receiverStageId, RelDistribution.Type exchangeType) {

Review Comment:
   (nit) provide brief javadocs on usage of both constructors please ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855384407


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StagePlanner.java:
##########
@@ -67,11 +68,12 @@ public QueryPlan makePlan(RelNode relRoot) {
     StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
 
     // global root needs to send results back to the ROOT, a.k.a. the client response node.
-    // the last stage is always a broadcast-gather.
+    // the last stage only has one receiver so doesn't matter what the exchange type is.

Review Comment:
   for now it is always unordered. so I haven't add any attribute. we can add when it is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855385537


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java:
##########
@@ -24,16 +24,18 @@
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.logical.LogicalCalc;

Review Comment:
   nope it is not used. I removed the calc rule so it will not be generated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8558: add support for project/filter pushdown

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8558:
URL: https://github.com/apache/pinot/pull/8558#discussion_r855611429


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/RexExpression.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+import org.apache.pinot.query.planner.nodes.serde.ProtoProperties;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+
+/**
+ * {@code RexExpression} is the serializable format of the {@link RexNode}.
+ */
+public abstract class RexExpression {
+  @ProtoProperties
+  protected SqlKind _sqlKind;
+  @ProtoProperties
+  protected RelDataType _dataType;
+
+  public SqlKind getKind() {
+    return _sqlKind;
+  }
+
+  public RelDataType getDataType() {
+    return _dataType;
+  }
+
+  public static RexExpression toRexExpression(RexNode rexNode) {
+    if (rexNode instanceof RexInputRef) {
+      return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex());
+    } else if (rexNode instanceof RexLiteral) {
+      RexLiteral rexLiteral = ((RexLiteral) rexNode);
+      return new RexExpression.Literal(rexLiteral.getType(), rexLiteral.getTypeName(), rexLiteral.getValue());
+    } else if (rexNode instanceof RexCall) {
+      RexCall rexCall = (RexCall) rexNode;
+      List<RexExpression> operands = rexCall.getOperands().stream().map(RexExpression::toRexExpression)
+          .collect(Collectors.toList());
+      return new RexExpression.FunctionCall(rexCall.getKind(), rexCall.getType(), rexCall.getOperator().getName(),
+          operands);
+    } else {
+      throw new IllegalArgumentException("Unsupported RexNode type with SqlKind: " + rexNode.getKind());
+    }
+  }
+
+  private static Comparable convertLiteral(Comparable value, SqlTypeName sqlTypeName, RelDataType dataType) {
+    switch (sqlTypeName) {
+      case BOOLEAN:
+        return (boolean) value;
+      case DECIMAL:

Review Comment:
   its actually the value that matters. we need to convert the list of Literal value wrap to pinot specific types. 
   there's addition things that needs to be addressed, for example a `IN ('a', 'b')` now is encoded as a `Sarg('a', 'b')` where each string is actually an NlsString. so there are more things to deal with. 
   
   will follow up with a separate PR on the literal conversion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org