You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "KKcorps (via GitHub)" <gi...@apache.org> on 2023/03/13 08:32:47 UTC

[GitHub] [pinot] KKcorps opened a new pull request, #10413: Refactor: Pass context instead on individual arguments to operator

KKcorps opened a new pull request, #10413:
URL: https://github.com/apache/pinot/pull/10413

   This will allow us to easily add more params to operators constructors in the future without major refactoring.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1136500528


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -277,26 +276,22 @@ public Object merge(Object agg, Object value) {
   private static class AggregateAccumulator extends AggregationUtils.Accumulator {
     private static final Map<String, Function<DataSchema.ColumnDataType, AggregationUtils.Merger>> AGG_MERGERS =
         ImmutableMap.<String, Function<DataSchema.ColumnDataType, AggregationUtils.Merger>>builder()
-            .putAll(AggregationUtils.Accumulator.MERGERS)
-            .put("FOURTHMOMENT",
+            .putAll(AggregationUtils.Accumulator.MERGERS).put("FOURTHMOMENT",
                 cdt -> cdt == DataSchema.ColumnDataType.OBJECT ? new MergeFourthMomentObject()
-                    : new MergeFourthMomentNumeric())
-            .put("$FOURTHMOMENT",
+                    : new MergeFourthMomentNumeric()).put("$FOURTHMOMENT",

Review Comment:
   nit please revert non-relevant changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr merged PR #10413:
URL: https://github.com/apache/pinot/pull/10413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on PR #10413:
URL: https://github.com/apache/pinot/pull/10413#issuecomment-1465713096

   @walterddr should I change test classes as well or do we want to leave them as it is so that arguments can be changes easily for testing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on PR #10413:
URL: https://github.com/apache/pinot/pull/10413#issuecomment-1471623728

   > @KKcorps can you address this:
   > 
   > > for the context object. can we put it as first argument?
   
   @somandal completed. Please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1134688431


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java:
##########
@@ -87,4 +87,8 @@ public void addReceivingMailboxes(List<MailboxIdentifier> ids) {
   public List<MailboxIdentifier> getReceivingMailboxes() {
     return ImmutableList.copyOf(_receivingMailboxes);
   }
+
+  public OperatorExecutionContext getOperatorExecutionContext() {
+    return new OperatorExecutionContext(this);
+  }

Review Comment:
   do not lazy generate this. create the operator execution context at construct time.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -45,6 +46,10 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
   protected final Map<String, OperatorStats> _operatorStatsMap;
   private final String _operatorId;
 
+  public MultiStageOperator(OperatorExecutionContext context) {
+    this(context.getRequestId(), context.getStageId(), context.getServer());
+  }

Review Comment:
   directly take the context object is fine. no need to parse the 3 components out. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -81,10 +82,9 @@ public class AggregateOperator extends MultiStageOperator {
   // groupSet has to be a list of InputRef and cannot be null
   // TODO: Add these two checks when we confirm we can handle error in upstream ctor call.
   public AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List<RexExpression> aggCalls,
-      List<RexExpression> groupSet, DataSchema inputSchema, long requestId, int stageId,
-      VirtualServerAddress virtualServerAddress) {
+      List<RexExpression> groupSet, DataSchema inputSchema, OperatorExecutionContext context) {
     this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, AggregateOperator.AggregateAccumulator.AGG_MERGERS,
-        requestId, stageId, virtualServerAddress);
+        context.getRequestId(), context.getStageId(), context.getServer());

Review Comment:
   make call via
   ```
   super(_operatorExecutionContext)
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OperatorExecutionContext.java:
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import java.util.Map;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class OperatorExecutionContext {

Review Comment:
   also you could even call this `OpchainExecutionContext` since it is not tight with any specific operator 
   (i am assuming operator specific info are encoded in the abstract classes such as explain string)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java:
##########
@@ -56,8 +56,8 @@ public class FilterOperator extends MultiStageOperator {
   private TransferableBlock _upstreamErrorBlock;
 
   public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter,
-      long requestId, int stageId, VirtualServerAddress serverAddress) {
-    super(requestId, stageId, serverAddress);
+      OperatorExecutionContext context) {
+    super(context);

Review Comment:
   why are some operators calling super(context) and some calling the other API? let's make them consistent.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OperatorExecutionContext.java:
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import java.util.Map;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class OperatorExecutionContext {

Review Comment:
   needs javadoc. this class is intented for non-modifiable contextual info. we want to replace the constructor of operator so that it extensible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1137630764


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java:
##########
@@ -84,11 +87,14 @@ public void shouldSwallowNoOpBlockFromUpstream()
       throws Exception {
     long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
-    MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
-        RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+//    OpChainExecutionContext context =
+//        OperatorTestUtil.getContext(1, DEFAULT_SENDER_STAGE_ID, new VirtualServerAddress(_server));

Review Comment:
   looks like commented out code. can you clean this up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1136500027


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -66,25 +67,31 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
-      int receiverStageId, VirtualServerAddress receiver) {
-    return new JsonMailboxIdentifier(
-        String.format("%s_%s", jobId, senderStageId),
-        new VirtualServerAddress(sender),
-        receiver,
-        senderStageId,
-        receiverStageId);
+  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId, int receiverStageId,
+      VirtualServerAddress receiver) {
+    return new JsonMailboxIdentifier(String.format("%s_%s", jobId, senderStageId), new VirtualServerAddress(sender),
+        receiver, senderStageId, receiverStageId);
+  }
+
+  public MailboxReceiveOperator(RelDistribution.Type exchangeType, int senderStageId, int receiverStageId,
+      OpChainExecutionContext context) {
+    this(context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId, receiverStageId,
+        context);
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
-      List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
-      long jobId, int senderStageId, int receiverStageId, Long timeoutMs) {
-    super(jobId, senderStageId, receiver);
-    _mailboxService = mailboxService;
+  public MailboxReceiveOperator(List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType,
+      int senderStageId, int receiverStageId, OpChainExecutionContext context) {
+    super(context);
+    _mailboxService = context.getMailboxService();
+    VirtualServerAddress receiver = context.getServer();
+    long jobId = context.getRequestId();
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
         "Exchange/Distribution type: " + exchangeType + " is not supported!");
-    long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
+    //TODO: Should I be using the Long.MAX_VALUE here instead of -1 or previous null check?
+    long timeoutNano =
+        (context.getTimeoutMs() != Long.MAX_VALUE ? context.getTimeoutMs() : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS)

Review Comment:
   context should not box timeout but should really box deadline. we need a clean up for all the timeout to deadline --> deadline should be generated from the broker side when it dispatches. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on PR #10413:
URL: https://github.com/apache/pinot/pull/10413#issuecomment-1470661221

   @KKcorps can you address this:
   
   > for the context object. can we put it as first argument?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1136125742


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -66,25 +67,31 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
-      int receiverStageId, VirtualServerAddress receiver) {
-    return new JsonMailboxIdentifier(
-        String.format("%s_%s", jobId, senderStageId),
-        new VirtualServerAddress(sender),
-        receiver,
-        senderStageId,
-        receiverStageId);
+  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId, int receiverStageId,
+      VirtualServerAddress receiver) {
+    return new JsonMailboxIdentifier(String.format("%s_%s", jobId, senderStageId), new VirtualServerAddress(sender),
+        receiver, senderStageId, receiverStageId);
+  }
+
+  public MailboxReceiveOperator(RelDistribution.Type exchangeType, int senderStageId, int receiverStageId,
+      OpChainExecutionContext context) {
+    this(context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId, receiverStageId,
+        context);
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
-      List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
-      long jobId, int senderStageId, int receiverStageId, Long timeoutMs) {
-    super(jobId, senderStageId, receiver);
-    _mailboxService = mailboxService;
+  public MailboxReceiveOperator(List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType,
+      int senderStageId, int receiverStageId, OpChainExecutionContext context) {
+    super(context);
+    _mailboxService = context.getMailboxService();
+    VirtualServerAddress receiver = context.getServer();
+    long jobId = context.getRequestId();
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
         "Exchange/Distribution type: " + exchangeType + " is not supported!");
-    long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
+    //TODO: Should I be using the Long.MAX_VALUE here instead of -1 or previous null check?
+    long timeoutNano =
+        (context.getTimeoutMs() != Long.MAX_VALUE ? context.getTimeoutMs() : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS)

Review Comment:
   @walterddr What would you prefer here? Should I box the value in context itself? Not sure why it needs to be null 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1136500745


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -66,25 +67,31 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
-      int receiverStageId, VirtualServerAddress receiver) {
-    return new JsonMailboxIdentifier(
-        String.format("%s_%s", jobId, senderStageId),
-        new VirtualServerAddress(sender),
-        receiver,
-        senderStageId,
-        receiverStageId);
+  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId, int receiverStageId,
+      VirtualServerAddress receiver) {
+    return new JsonMailboxIdentifier(String.format("%s_%s", jobId, senderStageId), new VirtualServerAddress(sender),
+        receiver, senderStageId, receiverStageId);

Review Comment:
   same here. please revert irrelevant changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1136501238


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -66,25 +67,31 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
-      int receiverStageId, VirtualServerAddress receiver) {
-    return new JsonMailboxIdentifier(
-        String.format("%s_%s", jobId, senderStageId),
-        new VirtualServerAddress(sender),
-        receiver,
-        senderStageId,
-        receiverStageId);
+  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId, int receiverStageId,
+      VirtualServerAddress receiver) {
+    return new JsonMailboxIdentifier(String.format("%s_%s", jobId, senderStageId), new VirtualServerAddress(sender),
+        receiver, senderStageId, receiverStageId);
+  }
+
+  public MailboxReceiveOperator(RelDistribution.Type exchangeType, int senderStageId, int receiverStageId,
+      OpChainExecutionContext context) {
+    this(context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId, receiverStageId,
+        context);
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
-      List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
-      long jobId, int senderStageId, int receiverStageId, Long timeoutMs) {
-    super(jobId, senderStageId, receiver);
-    _mailboxService = mailboxService;
+  public MailboxReceiveOperator(List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType,
+      int senderStageId, int receiverStageId, OpChainExecutionContext context) {
+    super(context);
+    _mailboxService = context.getMailboxService();
+    VirtualServerAddress receiver = context.getServer();
+    long jobId = context.getRequestId();
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
         "Exchange/Distribution type: " + exchangeType + " is not supported!");
-    long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
+    //TODO: Should I be using the Long.MAX_VALUE here instead of -1 or previous null check?
+    long timeoutNano =
+        (context.getTimeoutMs() != Long.MAX_VALUE ? context.getTimeoutMs() : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS)

Review Comment:
   actually timeout can be unset. thus the boxing. so let's not change this one for now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1136501976


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -68,24 +69,22 @@ interface MailboxIdGenerator {
     MailboxIdentifier generate(VirtualServer server);
   }
 
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
-      MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer> receivingStageInstances,
-      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
-      VirtualServerAddress sendingServer, long jobId, int senderStageId, int receiverStageId, long deadlineMs) {
-    this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
-        server -> toMailboxId(server, jobId, senderStageId, receiverStageId, sendingServer), BlockExchange::getExchange,
-        jobId, senderStageId, receiverStageId, sendingServer, deadlineMs);
+  public MailboxSendOperator(MultiStageOperator dataTableBlockBaseOperator, RelDistribution.Type exchangeType,

Review Comment:
   nit same here. please not enable auto-reformat so that the PR can be cleaner. thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10413:
URL: https://github.com/apache/pinot/pull/10413#issuecomment-1465912120

   ## [Codecov](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#10413](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2943150) into [master](https://codecov.io/gh/apache/pinot/commit/d475712d3136f3841169e66d2ad4da104717618b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d475712) will **decrease** coverage by `29.03%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10413       +/-   ##
   =============================================
   - Coverage     64.25%   35.23%   -29.03%     
   + Complexity     6059      282     -5777     
   =============================================
     Files          1997     2052       +55     
     Lines        108768   111262     +2494     
     Branches      16620    16913      +293     
   =============================================
   - Hits          69888    39198    -30690     
   - Misses        33816    68660    +34844     
   + Partials       5064     3404     -1660     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `24.47% <0.00%> (?)` | |
   | integration2 | `24.40% <0.00%> (?)` | |
   | unittests1 | `?` | |
   | unittests2 | `13.89% <0.00%> (+0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...inot/query/runtime/operator/AggregateOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9BZ2dyZWdhdGVPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-92.31%)` | :arrow_down: |
   | [...e/pinot/query/runtime/operator/FilterOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9GaWx0ZXJPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-92.60%)` | :arrow_down: |
   | [...pinot/query/runtime/operator/HashJoinOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9IYXNoSm9pbk9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-95.00%)` | :arrow_down: |
   | [...e/operator/LeafStageTransferableBlockOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9MZWFmU3RhZ2VUcmFuc2ZlcmFibGVCbG9ja09wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-77.89%)` | :arrow_down: |
   | [...t/query/runtime/operator/LiteralValueOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9MaXRlcmFsVmFsdWVPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...query/runtime/operator/MailboxReceiveOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94UmVjZWl2ZU9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-96.00%)` | :arrow_down: |
   | [...ot/query/runtime/operator/MailboxSendOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94U2VuZE9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-72.23%)` | :arrow_down: |
   | [...not/query/runtime/operator/MultiStageOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NdWx0aVN0YWdlT3BlcmF0b3IuamF2YQ==) | `0.00% <0.00%> (-79.55%)` | :arrow_down: |
   | [...g/apache/pinot/query/runtime/operator/OpChain.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9PcENoYWluLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/pinot/query/runtime/operator/SortOperator.java](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9Tb3J0T3BlcmF0b3IuamF2YQ==) | `0.00% <0.00%> (-95.35%)` | :arrow_down: |
   | ... and [5 more](https://codecov.io/gh/apache/pinot/pull/10413?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ... and [1449 files with indirect coverage changes](https://codecov.io/gh/apache/pinot/pull/10413/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1137630764


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java:
##########
@@ -84,11 +87,14 @@ public void shouldSwallowNoOpBlockFromUpstream()
       throws Exception {
     long deadlineMs = System.currentTimeMillis() + 10_000;
     // Given:
-    MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server),
-        RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+//    OpChainExecutionContext context =
+//        OperatorTestUtil.getContext(1, DEFAULT_SENDER_STAGE_ID, new VirtualServerAddress(_server));

Review Comment:
   looks like comments left over. can you clean this up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1136501238


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -66,25 +67,31 @@ public class MailboxReceiveOperator extends MultiStageOperator {
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
-      int receiverStageId, VirtualServerAddress receiver) {
-    return new JsonMailboxIdentifier(
-        String.format("%s_%s", jobId, senderStageId),
-        new VirtualServerAddress(sender),
-        receiver,
-        senderStageId,
-        receiverStageId);
+  private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId, int receiverStageId,
+      VirtualServerAddress receiver) {
+    return new JsonMailboxIdentifier(String.format("%s_%s", jobId, senderStageId), new VirtualServerAddress(sender),
+        receiver, senderStageId, receiverStageId);
+  }
+
+  public MailboxReceiveOperator(RelDistribution.Type exchangeType, int senderStageId, int receiverStageId,
+      OpChainExecutionContext context) {
+    this(context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId, receiverStageId,
+        context);
   }
 
   // TODO: Move deadlineInNanoSeconds to OperatorContext.
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
-      List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver,
-      long jobId, int senderStageId, int receiverStageId, Long timeoutMs) {
-    super(jobId, senderStageId, receiver);
-    _mailboxService = mailboxService;
+  public MailboxReceiveOperator(List<VirtualServer> sendingStageInstances, RelDistribution.Type exchangeType,
+      int senderStageId, int receiverStageId, OpChainExecutionContext context) {
+    super(context);
+    _mailboxService = context.getMailboxService();
+    VirtualServerAddress receiver = context.getServer();
+    long jobId = context.getRequestId();
     Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
         "Exchange/Distribution type: " + exchangeType + " is not supported!");
-    long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
+    //TODO: Should I be using the Long.MAX_VALUE here instead of -1 or previous null check?
+    long timeoutNano =
+        (context.getTimeoutMs() != Long.MAX_VALUE ? context.getTimeoutMs() : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS)

Review Comment:
   actually timeout can be unset. thus the boxing. so let's not change this one for now. we will clean up later please add a TODO:
   `timeout in Mailbox is set differently from OpChainExecutionContext, needs clean up later`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] somandal commented on a diff in pull request #10413: Refactor: Pass context instead on individual arguments to operator

Posted by "somandal (via GitHub)" <gi...@apache.org>.
somandal commented on code in PR #10413:
URL: https://github.com/apache/pinot/pull/10413#discussion_r1134569607


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OperatorExecutionContext.java:
##########
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import java.util.Map;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class OperatorExecutionContext {

Review Comment:
   nit: can you add javadocs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org