You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/09 02:20:45 UTC

[GitHub] [pinot] agavra opened a new pull request, #9767: [multistage] clean up AggregateOperator and add test coverage

agavra opened a new pull request, #9767:
URL: https://github.com/apache/pinot/pull/9767

   see description, this one is pretty straightforward :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018254481


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
  * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }
+
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+  private static final Map<String, Merger> MERGERS = ImmutableMap

Review Comment:
   IMO it's nice to have constants at the top of the file, but don't feel strongly. I'll move it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1017337713


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,33 @@
  * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }
+
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+  private static final Map<String, Merger> MERGERS = ImmutableMap

Review Comment:
   this lets us (1) inject mergers for testing and (2) throw in the constructor instead of waiting until a block is merged if there's an unknown aggregate function.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -123,11 +138,14 @@ public String toExplainString() {
   @Override
   protected TransferableBlock getNextBlock() {
     try {
-      if (!_readyToConstruct) {
-        consumeInputBlocks();
+      if (!_readyToConstruct && !consumeInputBlocks()) {

Review Comment:
   made a minor change here to continue to producing the block instead of returning a no-op block when we read EOS from the input source



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -228,4 +240,38 @@ private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
     }
     return new Key(keyElements);
   }
+
+  private static class Holder {

Review Comment:
   minor refactor so we don't need to maintain three different arrays (including `Map[]`, which is generally considered bad practice)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9767:
URL: https://github.com/apache/pinot/pull/9767#issuecomment-1308206325

   Great demonstration of how java testing should work! I learn a lot of Mockito stuff from this PR.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1017411017


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,33 @@
  * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }
+
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+  private static final Map<String, Merger> MERGERS = ImmutableMap

Review Comment:
   not totally sure what you mean - do you mean the constant or do you mean to use them within a method encapsulated within the holder?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018255056


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -18,26 +18,208 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 
 
 public class AggregateOperatorTest {
 
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _input;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldHandleUpstreamErrorBlocks() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group);
+
+    // When:
+    TransferableBlock block1 = operator.nextBlock(); // build
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block1.isErrorBlock(), "Input errors should propagate immediately");
+  }
+
+  @Test
+  public void shouldHandleEndOfStreamBlockWithNoOtherInputs() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group);
+
+    // When:
+    TransferableBlock block = operator.nextBlock();
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block.isEndOfStreamBlock(), "EOS blocks should propagate");
+  }
+
+  @Test
+  public void shouldHandleUpstreamNoOpBlocksWhileConstructing() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
+    Mockito.when(_input.nextBlock())
+        .thenReturn(block(inSchema, new Object[]{1, 1}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group);

Review Comment:
   > but i am also ok if the goal is to keep these tests isolated and easy to maintain.
   
   that's my goal :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018254221


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
  * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }

Review Comment:
   it used in testing, so it's package private :) I can move it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1017423971


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -51,4 +233,27 @@ public void testGroupByAggregateWithHashCollision() {
     Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
     Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
   }
+
+  @Test(
+      expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = ".*Unexpected value: AVERAGE.*")
+  public void shouldThrowOnUnknownAggFunction() {

Review Comment:
   Can we also have a test covering when the input type is not a number? I assume this will throw an exception in Java?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9767:
URL: https://github.com/apache/pinot/pull/9767#issuecomment-1308138026

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9767?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9767](https://codecov.io/gh/apache/pinot/pull/9767?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (340a124) into [master](https://codecov.io/gh/apache/pinot/commit/e3f2835686e3619ee0febae4ba19823125440e4a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e3f2835) will **decrease** coverage by `43.19%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9767       +/-   ##
   =============================================
   - Coverage     68.58%   25.39%   -43.20%     
   + Complexity     4976       44     -4932     
   =============================================
     Files          1955     1943       -12     
     Lines        104851   104520      -331     
     Branches      15874    15836       -38     
   =============================================
   - Hits          71917    26539    -45378     
   - Misses        27869    75283    +47414     
   + Partials       5065     2698     -2367     
   ```
   
   | Flag | Coverage Ξ” | |
   |---|---|---|
   | integration1 | `25.39% <0.00%> (?)` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9767?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [...inot/query/runtime/operator/AggregateOperator.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9BZ2dyZWdhdGVPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (-85.72%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/BytesUtils.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQnl0ZXNVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/trace/BaseRecording.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdHJhY2UvQmFzZVJlY29yZGluZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/trace/NoOpRecording.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdHJhY2UvTm9PcFJlY29yZGluZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/config/table/FSTType.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL0ZTVFR5cGUuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/config/user/RoleType.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3VzZXIvUm9sZVR5cGUuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/data/MetricFieldSpec.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvZGF0YS9NZXRyaWNGaWVsZFNwZWMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/stream/StreamMessage.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL1N0cmVhbU1lc3NhZ2UuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/common/tier/TierFactory.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdGllci9UaWVyRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/pinot/spi/config/table/TableType.java](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL1RhYmxlVHlwZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1468 more](https://codecov.io/gh/apache/pinot/pull/9767/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018222967


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -51,4 +233,27 @@ public void testGroupByAggregateWithHashCollision() {
     Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
     Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
   }
+
+  @Test(
+      expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = ".*Unexpected value: AVERAGE.*")
+  public void shouldThrowOnUnknownAggFunction() {

Review Comment:
   that's a good idea, i'll add that as part of my next PR so that I don't have to get another green check on this one πŸ˜† 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018240339


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
  * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }

Review Comment:
   nit: move next to Accumulator? also make it private? i don't foresee anyone uses this interface and accumulate is already a private member class



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -18,26 +18,208 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 
 
 public class AggregateOperatorTest {
 
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _input;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldHandleUpstreamErrorBlocks() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group);
+
+    // When:
+    TransferableBlock block1 = operator.nextBlock(); // build
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block1.isErrorBlock(), "Input errors should propagate immediately");
+  }
+
+  @Test
+  public void shouldHandleEndOfStreamBlockWithNoOtherInputs() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group);
+
+    // When:
+    TransferableBlock block = operator.nextBlock();
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block.isEndOfStreamBlock(), "EOS blocks should propagate");
+  }
+
+  @Test
+  public void shouldHandleUpstreamNoOpBlocksWhileConstructing() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new RexExpression.InputRef(0));
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT});
+    Mockito.when(_input.nextBlock())
+        .thenReturn(block(inSchema, new Object[]{1, 1}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group);

Review Comment:
   nit: these can be private static? 
   good thing is then you can use these to mock the operators. 
   but i am also ok if the goal is to keep these tests isolated and easy to maintain. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
  * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }
+
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+  private static final Map<String, Merger> MERGERS = ImmutableMap

Review Comment:
   this private static member can be in Accumulators?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1017357810


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -18,26 +18,208 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 
 
 public class AggregateOperatorTest {
 
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _input;

Review Comment:
   using this looks like promising than the mock operator factory if we are testing not just the data but also the mock behaviors (e.g. validate called, validate called with specific arguments)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,33 @@
  * If the input is single value, the output type will be input type. Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }
+
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+  private static final Map<String, Merger> MERGERS = ImmutableMap

Review Comment:
   can we put these together with the holder?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -228,4 +240,38 @@ private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
     }
     return new Key(keyElements);
   }
+
+  private static class Holder {

Review Comment:
   this refactoring is great. i was wondering if we can further refactor this. into a `Accumulator` class which holds the data but also holds the merge methods. not necessarily needed to be in this PR thought=



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -123,11 +138,14 @@ public String toExplainString() {
   @Override
   protected TransferableBlock getNextBlock() {
     try {
-      if (!_readyToConstruct) {
-        consumeInputBlocks();
+      if (!_readyToConstruct && !consumeInputBlocks()) {

Review Comment:
   πŸ‘ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1017411170


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -228,4 +240,38 @@ private static Key extraRowKey(Object[] row, List<RexExpression> groupSet) {
     }
     return new Key(keyElements);
   }
+
+  private static class Holder {

Review Comment:
   ah πŸ‘ I think this clarifies my question above. yeah I think that's a good refactor, I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9767: [multistage] clean up AggregateOperator and add test coverage

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9767:
URL: https://github.com/apache/pinot/pull/9767


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org