You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/02 18:44:45 UTC

[GitHub] [pinot] agavra opened a new pull request, #9711: [multistage] partial operator chain execution

agavra opened a new pull request, #9711:
URL: https://github.com/apache/pinot/pull/9711

   See the [design doc](https://docs.google.com/document/d/1XAMHAlhFbINvX-kK1ANlzbRz4_RkS0map4qhqs1yDtE/edit#) for a big picture view.
   
   This is the first PR in a series of PRs to improve our execution model. It implements "partial execution" of operator chains by allowing them to return a "noop" `MetadataBlock` in the scenario where there is either no data to process or no data to output.
   
   This PR is a non-functional change because the `WorkerQueryExecutor` doesn't actually take advantage of the partial execution ability - it just calls `operator#nextBlock` whenever it processes a noop block.
   
   ### PR Review Guide
   
   This PR is broken into 3 functional commits, which I recommend you review in order:
   
   1. first commit supports different types of MetadataBlocks - in the past there were two: EOS block and ERROR block and they were differentiated by the presence of the exception map. The first commit uses the variable bytes data in `BaseDataBlock` to encode a JSON object with additional metadata. It's a little hacky, but in the grand scheme of things it's very localized and allows us to have a lot of flexibility in how we use the metadata blocks going forward and maintains backwards compatibility with the existing code. Specifically, this is used to introduce a `NOOP` metadata block type that will be used to signal to the future scheduler that the task has completed the processing that it can do at the moment.
   2. This pipes the noop metadata blocks up an operator chain. There are two types of operators that produce noop metadata blocks: (a) the `MailboxReceiveOperator` when it has nothing available in its mailboxes and (b) stateful operators such as Sort/HashJoin that process a single block without producing anything (they need to process all blocks before producing).
   3. This commit adds testing and fixes up some things for `MetadataBlock` - specifically making sure that it is backwards compatible with the existing code.
   
   ### Testing
   
   We don't currently have any tests for the operators in the multistage engine, so it was tough to add that into this PR. I will follow this one up with one dedicated to testing operators. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014317993


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -91,6 +93,7 @@ public AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema d
     }
     _resultSchema = dataSchema;
 
+    _readyToConstruct = false;
     _isCumulativeBlockConstructed = false;

Review Comment:
   it's not actually the same thing 😢 the first time you hit `produce` you have `_readyToConstruct && !_isCumulativeBlockConstructed`, so you build and return the block. The second time, you have `_readyToConstruct && _isCumulativeBlockConstructed` so you return `EOS` block (this is one of the reasons the generic state machine is complicated)
   
   I can rename it to `_hasReturnedAggregateBlock` if that's clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1012454160


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();
+
+      if (!_readyToConstruct) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
       return produceAggregatedBlock();

Review Comment:
   wondering if we can wrapped around these with a BaseTransferrableOperator than handles metadata block (NOOP, ERROR, not sure if possible for EOS)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -137,25 +137,31 @@ public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailbo
           throw new RuntimeException("Received error query execution result block: "
               + transferableBlock.getDataBlock().getExceptions());
       }
+      if (transferableBlock.isNoOpBlock()) {
+        continue;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        return resultDataBlocks;
+      }

Review Comment:
   these should be after null check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1013053778


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();
+
+      if (!_readyToConstruct) {

Review Comment:
   I like this idea a lot, I'll play around with it today



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014148282


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {

Review Comment:
   this is a good design, next we can have different metadata block types added!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014154848


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java:
##########
@@ -149,7 +150,23 @@ public BaseDataBlock.Type getType() {
    * @return whether this block is the end of stream.
    */
   public boolean isEndOfStreamBlock() {
-    return _type == BaseDataBlock.Type.METADATA;
+    if (_isErrorBlock) {
+      return true;
+    } else if (_type != BaseDataBlock.Type.METADATA) {
+      return false;
+    }
+
+    MetadataBlock metadata = (MetadataBlock) _dataBlock;
+    return metadata.getType() == MetadataBlock.MetadataBlockType.EOS;
+  }
+
+  public boolean isNoOpBlock() {
+    if (_type != BaseDataBlock.Type.METADATA) {
+      return false;
+    }
+
+    MetadataBlock metadata = (MetadataBlock) _dataBlock;
+    return metadata.getType() == MetadataBlock.MetadataBlockType.NOOP;
   }
 
   /**

Review Comment:
   let's also change the `isErrorBlock()`, no longer need a special boolean flag. since it is saved in the metadata block anyway



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -91,6 +93,7 @@ public AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema d
     }
     _resultSchema = dataSchema;
 
+    _readyToConstruct = false;
     _isCumulativeBlockConstructed = false;

Review Comment:
   we can remove `_isCumulativeBlockConstructed`. this in the current context means `!_readyToConstruct`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -137,25 +137,31 @@ public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailbo
           throw new RuntimeException("Received error query execution result block: "
               + transferableBlock.getDataBlock().getExceptions());
       }
+      if (transferableBlock.isNoOpBlock()) {
+        continue;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        return resultDataBlocks;
+      }

Review Comment:
   in `transferableBlock.isNoOpBlock()` it checks `metadataBlock.getType() == MetadataBlock.MetadataBlockType.NOOP`. however metadataBlock can be null. which will throw NPE, no?
   
   should we have a null checker? transferableBlock.getDataBlock() != null ?
   
   previously the null check is not necessary b/c it only look at the `BaseDataBlock.Type _type` member variable which cannot be null in TransferableBlock



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -99,9 +98,13 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     // Build JOIN hash table
     buildBroadcastHashTable();
+
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (!_isHashTableBuilt) {
+      return TransferableBlockUtils.getNoOpTransferableBlock();
     }
+

Review Comment:
   nit: can we pull the `_isHashTableBuilt` boolean checker out of the private class into here. makes the logic cleaner



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();

Review Comment:
   nit: can we pull all the `_isReadyToConstruct` boolean up from the private methods to here. makes the logic cleaner.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -111,37 +114,46 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (System.nanoTime() >= _timeout) {
+      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
-    // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data.
-    boolean hasOpenedMailbox = true;
-    long timeoutWatermark = System.nanoTime() + _timeout;
-    while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
-      hasOpenedMailbox = false;
-      for (ServerInstance sendingInstance : _sendingStageInstances) {
-        try {
-          ReceivingMailbox<TransferableBlock> receivingMailbox =
-              _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
-          // TODO this is not threadsafe.
-          // make sure only one thread is checking receiving mailbox and calling receive() then close()
-          if (!receivingMailbox.isClosed()) {
-            hasOpenedMailbox = true;
-            TransferableBlock transferableBlock = receivingMailbox.receive();
-            if (transferableBlock != null && !transferableBlock.isEndOfStreamBlock()) {
-              // Return the block only if it has some valid data
-              return transferableBlock;
+
+    int startingIdx = _serverIdx;
+    int openMailboxCount = 0;
+    int eosCount = 0;
+
+    for (int i = 0; i < _sendingStageInstances.size(); i++) {
+      // this implements a round-robin mailbox iterator so we don't starve any mailboxes
+      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
+
+      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+      try {
+        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server));
+        if (!mailbox.isClosed()) {
+          openMailboxCount++;
+
+          // this is blocking for 100ms and may return null
+          TransferableBlock block = mailbox.receive();
+          if (block != null) {
+            if (!block.isEndOfStreamBlock()) {
+              return block;
             }
+            eosCount++;
           }
-        } catch (Exception e) {
-          LOGGER.error(String.format("Error receiving data from mailbox %s", sendingInstance), e);
         }
+      } catch (Exception e) {
+        LOGGER.error(String.format("Error receiving data from mailbox %s", server), e);
       }
     }
-    if (System.nanoTime() >= timeoutWatermark) {
-      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
-      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-    } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
-    }
+
+    // if we opened at least one mailbox, but still got to this point, then that means
+    // all the mailboxes we opened returned null but were not yet closed - early terminate
+    // with a noop block. Otherwise, we have exhausted all data from all mailboxes and can
+    // return EOS
+    return openMailboxCount > 0 && (openMailboxCount != eosCount)

Review Comment:
   This condition is a bit hard for me to validate. can't we just do `openMailboxCount > 0`? 
   IIUC, the last one is only for when you exactly close a mailbox afterwards and save another call to the getNextBlock() only to return an EOS, yes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014430540


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -137,25 +137,31 @@ public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailbo
           throw new RuntimeException("Received error query execution result block: "
               + transferableBlock.getDataBlock().getExceptions());
       }
+      if (transferableBlock.isNoOpBlock()) {
+        continue;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        return resultDataBlocks;
+      }

Review Comment:
   :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9711:
URL: https://github.com/apache/pinot/pull/9711#issuecomment-1304264414

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9711?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9711](https://codecov.io/gh/apache/pinot/pull/9711?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2f429d9) into [master](https://codecov.io/gh/apache/pinot/commit/8b03dee7adcbee9c8d92b76a1597ae804f150383?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8b03dee) will **decrease** coverage by `10.22%`.
   > The diff coverage is `0.00%`.
   
   > :exclamation: Current head 2f429d9 differs from pull request most recent head 3ef8734. Consider uploading reports for the commit 3ef8734 to get more accurate results
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9711       +/-   ##
   =============================================
   - Coverage     34.81%   24.59%   -10.23%     
   + Complexity      190       53      -137     
   =============================================
     Files          1950     1939       -11     
     Lines        104313   104203      -110     
     Branches      15802    15792       -10     
   =============================================
   - Hits          36320    25632    -10688     
   - Misses        64896    75971    +11075     
   + Partials       3097     2600      -497     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `24.59% <0.00%> (?)` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9711?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/pinot/common/datablock/DataBlockUtils.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZGF0YWJsb2NrL0RhdGFCbG9ja1V0aWxzLmphdmE=) | `0.00% <0.00%> (-41.13%)` | :arrow_down: |
   | [...g/apache/pinot/common/datablock/MetadataBlock.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZGF0YWJsb2NrL01ldGFkYXRhQmxvY2suamF2YQ==) | `0.00% <0.00%> (-45.46%)` | :arrow_down: |
   | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | `0.00% <0.00%> (-81.56%)` | :arrow_down: |
   | [.../pinot/query/runtime/blocks/TransferableBlock.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9ibG9ja3MvVHJhbnNmZXJhYmxlQmxvY2suamF2YQ==) | `0.00% <0.00%> (-67.40%)` | :arrow_down: |
   | [...t/query/runtime/blocks/TransferableBlockUtils.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9ibG9ja3MvVHJhbnNmZXJhYmxlQmxvY2tVdGlscy5qYXZh) | `0.00% <0.00%> (-90.48%)` | :arrow_down: |
   | [...ot/query/runtime/executor/WorkerQueryExecutor.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9leGVjdXRvci9Xb3JrZXJRdWVyeUV4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...inot/query/runtime/operator/AggregateOperator.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9BZ2dyZWdhdGVPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-84.22%)` | :arrow_down: |
   | [...e/pinot/query/runtime/operator/FilterOperator.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9GaWx0ZXJPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-60.00%)` | :arrow_down: |
   | [...pinot/query/runtime/operator/HashJoinOperator.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9IYXNoSm9pbk9wZXJhdG9yLmphdmE=) | `0.00% <0.00%> (-82.61%)` | :arrow_down: |
   | [...t/query/runtime/operator/LiteralValueOperator.java](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9MaXRlcmFsVmFsdWVPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-57.90%)` | :arrow_down: |
   | ... and [664 more](https://codecov.io/gh/apache/pinot/pull/9711/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014154668


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {
+    /**
+     * Indicates that this block is the final block to be sent
+     * (End Of Stream) as part of an operator chain computation.
+     */
+    EOS,
+
+    /**
+     * An {@code ERROR} metadata block indicates that there was
+     * some error during computation. To retrieve the error that
+     * occurred, use {@link MetadataBlock#getExceptions()}
+     */
+    ERROR,
+
+    /**
+     * A {@code NOOP} metadata block can be sent at any point to
+     * and should be ignored by downstream - it is often used to
+     * indicate that the operator chain either has nothing to process
+     * or has processed data but is not yet ready to emit a result
+     * block.
+     */
+    NOOP;
+
+    MetadataBlockType() {
+    }
   }
 
-  public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+  /**
+   * Used to serialize the contents of the metadata block conveniently and in
+   * a backwards compatible way. Use JSON because the performance of metadata block
+   * SerDe should not be a bottleneck.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @VisibleForTesting
+  static class Contents {
+
+    private String _type;
+
+    @JsonCreator
+    public Contents(@JsonProperty("type") String type) {
+      _type = type;
+    }
+
+    @JsonCreator
+    public Contents() {
+      _type = null;
+    }
+
+    public String getType() {
+      return _type;
+    }
+
+    public void setType(String type) {
+      _type = type;
+    }
+  }
+
+  private final Contents _contents;
+
+  public MetadataBlock(MetadataBlockType type) {
+    super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+    _contents = new Contents(type.name());
+  }
+
+  private static byte[] toContents(Contents type) {
+    try {
+      return JSON.writeValueAsBytes(type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
       throws IOException {
     super(byteBuffer);
+    if (_variableSizeDataBytes != null) {
+      _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+    } else {
+      _contents = new Contents();
+    }
+  }
+
+  public MetadataBlockType getType() {
+    String type = _contents.getType();
+
+    // if type is null, then we're reading a legacy block where we didn't encode any
+    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+    // otherwise
+    return type == null
+        ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+        : MetadataBlockType.valueOf(type);

Review Comment:
   I was very careful to make sure that it won't throw when deserializing (see the tests). is there any reason why we don't need to consider backwards compatibility? if I don't then I can be less hacky in the serialization format!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014429188


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {
+    /**
+     * Indicates that this block is the final block to be sent
+     * (End Of Stream) as part of an operator chain computation.
+     */
+    EOS,
+
+    /**
+     * An {@code ERROR} metadata block indicates that there was
+     * some error during computation. To retrieve the error that
+     * occurred, use {@link MetadataBlock#getExceptions()}
+     */
+    ERROR,
+
+    /**
+     * A {@code NOOP} metadata block can be sent at any point to
+     * and should be ignored by downstream - it is often used to
+     * indicate that the operator chain either has nothing to process
+     * or has processed data but is not yet ready to emit a result
+     * block.
+     */
+    NOOP;
+
+    MetadataBlockType() {
+    }
   }
 
-  public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+  /**
+   * Used to serialize the contents of the metadata block conveniently and in
+   * a backwards compatible way. Use JSON because the performance of metadata block
+   * SerDe should not be a bottleneck.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @VisibleForTesting
+  static class Contents {
+
+    private String _type;
+
+    @JsonCreator
+    public Contents(@JsonProperty("type") String type) {
+      _type = type;
+    }
+
+    @JsonCreator
+    public Contents() {
+      _type = null;
+    }
+
+    public String getType() {
+      return _type;
+    }
+
+    public void setType(String type) {
+      _type = type;
+    }
+  }
+
+  private final Contents _contents;
+
+  public MetadataBlock(MetadataBlockType type) {
+    super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+    _contents = new Contents(type.name());
+  }
+
+  private static byte[] toContents(Contents type) {
+    try {
+      return JSON.writeValueAsBytes(type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
       throws IOException {
     super(byteBuffer);
+    if (_variableSizeDataBytes != null) {
+      _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+    } else {
+      _contents = new Contents();
+    }
+  }
+
+  public MetadataBlockType getType() {
+    String type = _contents.getType();
+
+    // if type is null, then we're reading a legacy block where we didn't encode any
+    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+    // otherwise
+    return type == null
+        ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+        : MetadataBlockType.valueOf(type);

Review Comment:
   > if the metadata block transferred over the wire is of previous version. then using the current version of the code it cannot reconstruct a metadata block back from the byteBuffer
   
   that's not correct - it can decode the byteBuffer, the only difference is that it will read it with an empty `_variableBytesData`, which will mean the JSON contents will be empty. (see the `MetadataBlockTest`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014414759


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -91,6 +93,7 @@ public AggregateOperator(Operator<TransferableBlock> inputOperator, DataSchema d
     }
     _resultSchema = dataSchema;
 
+    _readyToConstruct = false;
     _isCumulativeBlockConstructed = false;

Review Comment:
   ah..... 
   good point!!!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014392934


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -137,25 +137,31 @@ public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailbo
           throw new RuntimeException("Received error query execution result block: "
               + transferableBlock.getDataBlock().getExceptions());
       }
+      if (transferableBlock.isNoOpBlock()) {
+        continue;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        return resultDataBlocks;
+      }

Review Comment:
   hmm, looking at the code I don't think `TransferableBlock#getDataBlock` can ever return `null` - if the field is null, it either builds the block or throws an exception, otherwise it returns the field.
   
   I'll remove this check altogether since I think it's just misleading. I still think we shouldn't be adding to the result table on metadata blocks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1013388970


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();
+
+      if (!_readyToConstruct) {

Review Comment:
   +1 no problem 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1013371302


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();
+
+      if (!_readyToConstruct) {

Review Comment:
   I played around with this and couldn't easily come up with something that I liked significantly better than just inlining the logic. 
   
   The reason for this was because:
   1. some stateful operators have different logic (e.g. HashJoin needs to build the broadcast table first, pulling from the right side, and then becomes a 1:1 operator)
   2. the state transition couldn't happen in the parent class, the child class still had to track when to transition itself, which means we still held onto state in the child class
   3. it becomes harder to read when the state transition logic is split across two classes
   
   I think for this PR, I'll keep the classes as is and we can try to refactor and make it easier to read in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014429953


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {
+    /**
+     * Indicates that this block is the final block to be sent
+     * (End Of Stream) as part of an operator chain computation.
+     */
+    EOS,
+
+    /**
+     * An {@code ERROR} metadata block indicates that there was
+     * some error during computation. To retrieve the error that
+     * occurred, use {@link MetadataBlock#getExceptions()}
+     */
+    ERROR,
+
+    /**
+     * A {@code NOOP} metadata block can be sent at any point to
+     * and should be ignored by downstream - it is often used to
+     * indicate that the operator chain either has nothing to process
+     * or has processed data but is not yet ready to emit a result
+     * block.
+     */
+    NOOP;
+
+    MetadataBlockType() {
+    }
   }
 
-  public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+  /**
+   * Used to serialize the contents of the metadata block conveniently and in
+   * a backwards compatible way. Use JSON because the performance of metadata block
+   * SerDe should not be a bottleneck.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @VisibleForTesting
+  static class Contents {
+
+    private String _type;
+
+    @JsonCreator
+    public Contents(@JsonProperty("type") String type) {
+      _type = type;
+    }
+
+    @JsonCreator
+    public Contents() {
+      _type = null;
+    }
+
+    public String getType() {
+      return _type;
+    }
+
+    public void setType(String type) {
+      _type = type;
+    }
+  }
+
+  private final Contents _contents;
+
+  public MetadataBlock(MetadataBlockType type) {
+    super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+    _contents = new Contents(type.name());
+  }
+
+  private static byte[] toContents(Contents type) {
+    try {
+      return JSON.writeValueAsBytes(type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
       throws IOException {
     super(byteBuffer);
+    if (_variableSizeDataBytes != null) {
+      _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+    } else {
+      _contents = new Contents();
+    }
+  }
+
+  public MetadataBlockType getType() {
+    String type = _contents.getType();
+
+    // if type is null, then we're reading a legacy block where we didn't encode any
+    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+    // otherwise
+    return type == null
+        ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+        : MetadataBlockType.valueOf(type);

Review Comment:
   ah... ok. yeah in that case we should keep this. thanks for the explanation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1013372210


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();
+
+      if (!_readyToConstruct) {
+        return TransferableBlockUtils.getNoOpTransferableBlock();
+      }
+
       return produceAggregatedBlock();

Review Comment:
   se comment below, it isn't straightforward to make this clean (there isn't even a clean way to get the child block because you might have to know whether to read from left/right in case of a join or whether to read at all in the case that the operator is currently a 0->1 operator)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1013372916


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -137,25 +137,31 @@ public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailbo
           throw new RuntimeException("Received error query execution result block: "
               + transferableBlock.getDataBlock().getExceptions());
       }
+      if (transferableBlock.isNoOpBlock()) {
+        continue;
+      } else if (transferableBlock.isEndOfStreamBlock()) {
+        return resultDataBlocks;
+      }

Review Comment:
   I don't think so, we shouldn't be adding noops/eos to the results - right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014152556


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {
+    /**
+     * Indicates that this block is the final block to be sent
+     * (End Of Stream) as part of an operator chain computation.
+     */
+    EOS,
+
+    /**
+     * An {@code ERROR} metadata block indicates that there was
+     * some error during computation. To retrieve the error that
+     * occurred, use {@link MetadataBlock#getExceptions()}
+     */
+    ERROR,
+
+    /**
+     * A {@code NOOP} metadata block can be sent at any point to
+     * and should be ignored by downstream - it is often used to
+     * indicate that the operator chain either has nothing to process
+     * or has processed data but is not yet ready to emit a result
+     * block.
+     */
+    NOOP;
+
+    MetadataBlockType() {
+    }
   }
 
-  public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+  /**
+   * Used to serialize the contents of the metadata block conveniently and in
+   * a backwards compatible way. Use JSON because the performance of metadata block
+   * SerDe should not be a bottleneck.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @VisibleForTesting
+  static class Contents {
+
+    private String _type;
+
+    @JsonCreator
+    public Contents(@JsonProperty("type") String type) {
+      _type = type;
+    }
+
+    @JsonCreator
+    public Contents() {
+      _type = null;
+    }
+
+    public String getType() {
+      return _type;
+    }
+
+    public void setType(String type) {
+      _type = type;
+    }
+  }
+
+  private final Contents _contents;
+
+  public MetadataBlock(MetadataBlockType type) {
+    super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+    _contents = new Contents(type.name());
+  }
+
+  private static byte[] toContents(Contents type) {
+    try {
+      return JSON.writeValueAsBytes(type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
       throws IOException {
     super(byteBuffer);
+    if (_variableSizeDataBytes != null) {
+      _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+    } else {
+      _contents = new Contents();
+    }
+  }
+
+  public MetadataBlockType getType() {
+    String type = _contents.getType();
+
+    // if type is null, then we're reading a legacy block where we didn't encode any
+    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+    // otherwise
+    return type == null
+        ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+        : MetadataBlockType.valueOf(type);

Review Comment:
   we dont need to consider backward compatibility here. --> if the type is null, that means it is a legacy block. you will throw when deserializing anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on PR #9711:
URL: https://github.com/apache/pinot/pull/9711#issuecomment-1304590330

   had to rebase after #9729 and #9676 - @walterddr the changes are in `AggregateOperator` `HashJoinOperator` and the corresponding tests if you want to review just those files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1012960840


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -116,6 +119,11 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     try {
       consumeInputBlocks();
+
+      if (!_readyToConstruct) {

Review Comment:
   I was wondering what would be the best way to model the "stop-the-world" and "passthrough" operators?
   
   my thinking is: model all operators as "take 0 - 1 block in, produce 0 - 1 block out"
   
   e.g. if a chain calling `nextBlock()` 
   - when it hits agg operator when it hasn't seen an EOS, then it is a 1 block in, 0 block out operator
   - when it hits agg operator that has seen an EOS, then it is a 0 block in, 1 block out.
   - when it hits filter operator, it is a 1 block in, 0 - 1 block out 
   - when it hits project operator, it is a 1 block in, 1 block out 
   
   with this design each operator is clearly a state machine and we don't have to maintain special boolean flags on each impl, what do you think?
   
   (0 block out as No-op block out); 
   (0 block in as not calling getNextBlock of the upstream operator)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014386219


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -111,37 +114,46 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
+    } else if (System.nanoTime() >= _timeout) {
+      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
-    // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data.
-    boolean hasOpenedMailbox = true;
-    long timeoutWatermark = System.nanoTime() + _timeout;
-    while (hasOpenedMailbox && System.nanoTime() < timeoutWatermark) {
-      hasOpenedMailbox = false;
-      for (ServerInstance sendingInstance : _sendingStageInstances) {
-        try {
-          ReceivingMailbox<TransferableBlock> receivingMailbox =
-              _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
-          // TODO this is not threadsafe.
-          // make sure only one thread is checking receiving mailbox and calling receive() then close()
-          if (!receivingMailbox.isClosed()) {
-            hasOpenedMailbox = true;
-            TransferableBlock transferableBlock = receivingMailbox.receive();
-            if (transferableBlock != null && !transferableBlock.isEndOfStreamBlock()) {
-              // Return the block only if it has some valid data
-              return transferableBlock;
+
+    int startingIdx = _serverIdx;
+    int openMailboxCount = 0;
+    int eosCount = 0;
+
+    for (int i = 0; i < _sendingStageInstances.size(); i++) {
+      // this implements a round-robin mailbox iterator so we don't starve any mailboxes
+      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
+
+      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+      try {
+        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server));
+        if (!mailbox.isClosed()) {
+          openMailboxCount++;
+
+          // this is blocking for 100ms and may return null
+          TransferableBlock block = mailbox.receive();
+          if (block != null) {
+            if (!block.isEndOfStreamBlock()) {
+              return block;
             }
+            eosCount++;
           }
-        } catch (Exception e) {
-          LOGGER.error(String.format("Error receiving data from mailbox %s", sendingInstance), e);
         }
+      } catch (Exception e) {
+        LOGGER.error(String.format("Error receiving data from mailbox %s", server), e);
       }
     }
-    if (System.nanoTime() >= timeoutWatermark) {
-      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
-      return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
-    } else {
-      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
-    }
+
+    // if we opened at least one mailbox, but still got to this point, then that means
+    // all the mailboxes we opened returned null but were not yet closed - early terminate
+    // with a noop block. Otherwise, we have exhausted all data from all mailboxes and can
+    // return EOS
+    return openMailboxCount > 0 && (openMailboxCount != eosCount)

Review Comment:
   yeah this condition isn't necessary, it's technically an optimization to avoid needing another call. I'll remove it (at first I thought it was necessary, but it was actually a different bug that I was figuring out)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
agavra commented on PR #9711:
URL: https://github.com/apache/pinot/pull/9711#issuecomment-1304249032

   @walterddr just wanted to make sure you don't merge until tests pass, looks like the last commit actually introduced some regressions... double checking that now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9711:
URL: https://github.com/apache/pinot/pull/9711#discussion_r1014415808


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {
+    /**
+     * Indicates that this block is the final block to be sent
+     * (End Of Stream) as part of an operator chain computation.
+     */
+    EOS,
+
+    /**
+     * An {@code ERROR} metadata block indicates that there was
+     * some error during computation. To retrieve the error that
+     * occurred, use {@link MetadataBlock#getExceptions()}
+     */
+    ERROR,
+
+    /**
+     * A {@code NOOP} metadata block can be sent at any point to
+     * and should be ignored by downstream - it is often used to
+     * indicate that the operator chain either has nothing to process
+     * or has processed data but is not yet ready to emit a result
+     * block.
+     */
+    NOOP;
+
+    MetadataBlockType() {
+    }
   }
 
-  public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+  /**
+   * Used to serialize the contents of the metadata block conveniently and in
+   * a backwards compatible way. Use JSON because the performance of metadata block
+   * SerDe should not be a bottleneck.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @VisibleForTesting
+  static class Contents {
+
+    private String _type;
+
+    @JsonCreator
+    public Contents(@JsonProperty("type") String type) {
+      _type = type;
+    }
+
+    @JsonCreator
+    public Contents() {
+      _type = null;
+    }
+
+    public String getType() {
+      return _type;
+    }
+
+    public void setType(String type) {
+      _type = type;
+    }
+  }
+
+  private final Contents _contents;
+
+  public MetadataBlock(MetadataBlockType type) {
+    super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+    _contents = new Contents(type.name());
+  }
+
+  private static byte[] toContents(Contents type) {
+    try {
+      return JSON.writeValueAsBytes(type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
       throws IOException {
     super(byteBuffer);
+    if (_variableSizeDataBytes != null) {
+      _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+    } else {
+      _contents = new Contents();
+    }
+  }
+
+  public MetadataBlockType getType() {
+    String type = _contents.getType();
+
+    // if type is null, then we're reading a legacy block where we didn't encode any
+    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+    // otherwise
+    return type == null
+        ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+        : MetadataBlockType.valueOf(type);

Review Comment:
   please let me know if my understanding is correct. if the metadata block is of previous version. then using the current version of the code it cannot reconstruct a metadata block back from the `byteBuffer` (as it will not be encoded using jackson). 
   
   in that case we will never reach a situation where the byteBuffer is decodable, and type is null. correct?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -18,28 +18,118 @@
  */
 package org.apache.pinot.common.datablock;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.pinot.common.utils.DataSchema;
 
 
 /**
- * Wrapper for row-wise data table. It stores data in row-major format.
+ * A block type to indicate some metadata about the current processing state.
+ * For the different types of metadata blocks see {@link MetadataBlockType}.
  */
 public class MetadataBlock extends BaseDataBlock {
-  private static final int VERSION = 1;
 
-  public MetadataBlock() {
-    super(0, null, new String[0], new byte[]{0}, new byte[]{0});
+  private static final ObjectMapper JSON = new ObjectMapper();
+
+  @VisibleForTesting
+  static final int VERSION = 1;
+
+  public enum MetadataBlockType {
+    /**
+     * Indicates that this block is the final block to be sent
+     * (End Of Stream) as part of an operator chain computation.
+     */
+    EOS,
+
+    /**
+     * An {@code ERROR} metadata block indicates that there was
+     * some error during computation. To retrieve the error that
+     * occurred, use {@link MetadataBlock#getExceptions()}
+     */
+    ERROR,
+
+    /**
+     * A {@code NOOP} metadata block can be sent at any point to
+     * and should be ignored by downstream - it is often used to
+     * indicate that the operator chain either has nothing to process
+     * or has processed data but is not yet ready to emit a result
+     * block.
+     */
+    NOOP;
+
+    MetadataBlockType() {
+    }
   }
 
-  public MetadataBlock(DataSchema dataSchema) {
-    super(0, dataSchema, new String[0], new byte[]{0}, new byte[]{0});
+  /**
+   * Used to serialize the contents of the metadata block conveniently and in
+   * a backwards compatible way. Use JSON because the performance of metadata block
+   * SerDe should not be a bottleneck.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @VisibleForTesting
+  static class Contents {
+
+    private String _type;
+
+    @JsonCreator
+    public Contents(@JsonProperty("type") String type) {
+      _type = type;
+    }
+
+    @JsonCreator
+    public Contents() {
+      _type = null;
+    }
+
+    public String getType() {
+      return _type;
+    }
+
+    public void setType(String type) {
+      _type = type;
+    }
+  }
+
+  private final Contents _contents;
+
+  public MetadataBlock(MetadataBlockType type) {
+    super(0, null, new String[0], new byte[]{0}, toContents(new Contents(type.name())));
+    _contents = new Contents(type.name());
+  }
+
+  private static byte[] toContents(Contents type) {
+    try {
+      return JSON.writeValueAsBytes(type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public MetadataBlock(ByteBuffer byteBuffer)
       throws IOException {
     super(byteBuffer);
+    if (_variableSizeDataBytes != null) {
+      _contents = JSON.readValue(_variableSizeDataBytes, Contents.class);
+    } else {
+      _contents = new Contents();
+    }
+  }
+
+  public MetadataBlockType getType() {
+    String type = _contents.getType();
+
+    // if type is null, then we're reading a legacy block where we didn't encode any
+    // data. assume that it is an EOS block if there's no exceptions and an ERROR block
+    // otherwise
+    return type == null
+        ? (getExceptions().isEmpty() ? MetadataBlockType.EOS : MetadataBlockType.ERROR)
+        : MetadataBlockType.valueOf(type);

Review Comment:
   please let me know if my understanding is correct. if the metadata block transferred over the wire is of previous version. then using the current version of the code it cannot reconstruct a metadata block back from the `byteBuffer` (as it will not be encoded using jackson). 
   
   in that case we will never reach a situation where the byteBuffer is decodable, and type is null. correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9711: [multistage] partial operator chain execution

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9711:
URL: https://github.com/apache/pinot/pull/9711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org