You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/12/07 22:49:23 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #9929: [multistage][reland] fix leaf stage return data schema

walterddr commented on code in PR #9929:
URL: https://github.com/apache/pinot/pull/9929#discussion_r1042756492


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java:
##########
@@ -24,98 +24,222 @@
 import java.util.List;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
 
+
+// TODO: add tests for Agg / GroupBy / Distinct result blocks
 public class LeafStageTransferableBlockOperatorTest {
 
   @Test
   public void shouldReturnDataBlockThenMetadataBlock() {
     // Given:
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl");
     DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT});
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
-        new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), null));
+        new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext));
     LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema);
 
     // When:
-    TransferableBlock transferableBlock = operator.nextBlock();
+    TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
-    Assert.assertEquals(transferableBlock.getContainer().get(0), new Object[]{"foo", 1});
-    Assert.assertEquals(transferableBlock.getContainer().get(1), new Object[]{"", 2});
+    Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{"foo", 1});
+    Assert.assertEquals(resultBlock.getContainer().get(1), new Object[]{"", 2});
+    Assert.assertTrue(operator.nextBlock().isEndOfStreamBlock(), "Expected EOS after reading two rows");
+  }
+
+  @Test
+  public void shouldHandleDesiredDataSchemaConversionCorrectly() {
+    // Given:
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT boolCol, tsCol, boolCol AS newNamedBoolCol FROM tbl");
+    DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "tsCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.TIMESTAMP});
+    DataSchema desiredSchema = new DataSchema(new String[]{"boolCol", "tsCol", "newNamedBoolCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.TIMESTAMP,
+            DataSchema.ColumnDataType.BOOLEAN});
+    List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
+        new SelectionResultsBlock(resultSchema, Arrays.asList(new Object[]{1, 1660000000000L},
+            new Object[]{0, 1600000000000L})), queryContext));
+    LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList,
+        desiredSchema);
+
+    // When:
+    TransferableBlock resultBlock = operator.nextBlock();
+
+    // Then:
+    Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{true, new Timestamp(1660000000000L), true});
+    Assert.assertEquals(resultBlock.getContainer().get(1), new Object[]{false, new Timestamp(1600000000000L), false});
     Assert.assertTrue(operator.nextBlock().isEndOfStreamBlock(), "Expected EOS after reading two rows");
   }
 
   @Test
   public void shouldHandleCanonicalizationCorrectly() {
     // TODO: not all stored types are supported, add additional datatype when they are supported.
     // Given:
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT boolCol, tsCol FROM tbl");
     DataSchema schema = new DataSchema(new String[]{"boolCol", "tsCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.TIMESTAMP});
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
         new SelectionResultsBlock(schema, Arrays.asList(new Object[]{1, 1660000000000L},
-            new Object[]{0, 1600000000000L})), null));
+            new Object[]{0, 1600000000000L})), queryContext));
     LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema);
 
     // When:
-    TransferableBlock transferableBlock = operator.nextBlock();
+    TransferableBlock resultBlock = operator.nextBlock();
 
     // Then:
-    Assert.assertEquals(transferableBlock.getContainer().get(0), new Object[]{true, new Timestamp(1660000000000L)});
-    Assert.assertEquals(transferableBlock.getContainer().get(1), new Object[]{false, new Timestamp(1600000000000L)});
+    Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{true, new Timestamp(1660000000000L)});
+    Assert.assertEquals(resultBlock.getContainer().get(1), new Object[]{false, new Timestamp(1600000000000L)});
     Assert.assertTrue(operator.nextBlock().isEndOfStreamBlock(), "Expected EOS after reading two rows");
   }
 
   @Test
   public void shouldReturnMultipleDataBlockThenMetadataBlock() {
     // Given:
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl");
     DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT});
     List<InstanceResponseBlock> resultsBlockList = Arrays.asList(
         new InstanceResponseBlock(new SelectionResultsBlock(schema,
-            Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), null),
+            Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext),
         new InstanceResponseBlock(new SelectionResultsBlock(schema,
-            Arrays.asList(new Object[]{"bar", 3}, new Object[]{"foo", 4})), null),
-        new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), null));
+            Arrays.asList(new Object[]{"bar", 3}, new Object[]{"foo", 4})), queryContext),
+        new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
     LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema);
 
     // When:
-    TransferableBlock transferableBlock1 = operator.nextBlock();
-    TransferableBlock transferableBlock2 = operator.nextBlock();
-    TransferableBlock transferableBlock3 = operator.nextBlock();
+    TransferableBlock resultBlock1 = operator.nextBlock();
+    TransferableBlock resultBlock2 = operator.nextBlock();
+    TransferableBlock resultBlock3 = operator.nextBlock();
 
     // Then:
-    Assert.assertEquals(transferableBlock1.getContainer().get(0), new Object[]{"foo", 1});
-    Assert.assertEquals(transferableBlock1.getContainer().get(1), new Object[]{"", 2});
-    Assert.assertEquals(transferableBlock2.getContainer().get(0), new Object[]{"bar", 3});
-    Assert.assertEquals(transferableBlock2.getContainer().get(1), new Object[]{"foo", 4});
-    Assert.assertEquals(transferableBlock3.getContainer().size(), 0);
+    Assert.assertEquals(resultBlock1.getContainer().get(0), new Object[]{"foo", 1});
+    Assert.assertEquals(resultBlock1.getContainer().get(1), new Object[]{"", 2});
+    Assert.assertEquals(resultBlock2.getContainer().get(0), new Object[]{"bar", 3});
+    Assert.assertEquals(resultBlock2.getContainer().get(1), new Object[]{"foo", 4});
+    Assert.assertEquals(resultBlock3.getContainer().size(), 0);
     Assert.assertTrue(operator.nextBlock().isEndOfStreamBlock(), "Expected EOS after reading two rows");
   }
 
   @Test
   public void shouldGetErrorBlockWhenInstanceResponseContainsError() {
     // Given:
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT strCol, intCol FROM tbl");
     DataSchema schema = new DataSchema(new String[]{"strCol", "intCol"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT});
     InstanceResponseBlock errorBlock = new InstanceResponseBlock();
     errorBlock.addException(QueryException.QUERY_EXECUTION_ERROR.getErrorCode(), "foobar");
     List<InstanceResponseBlock> resultsBlockList = Arrays.asList(
         new InstanceResponseBlock(new SelectionResultsBlock(schema,
-            Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), null),
+            Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext),
         errorBlock,
-        new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), null));
+        new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext));
+    LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema);
+
+    // When:
+    TransferableBlock resultBlock = operator.nextBlock();
+
+    // Then:
+    Assert.assertTrue(resultBlock.isErrorBlock());
+  }
+
+  @Test
+  public void shouldReorderWhenQueryContextAskForNotInOrderGroupByAsDistinct() {
+    // Given:
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContext(
+        "SELECT intCol, strCol FROM tbl GROUP BY strCol, intCol");
+    // result schema doesn't match with DISTINCT columns using GROUP BY.
+    DataSchema schema = new DataSchema(new String[]{"intCol", "strCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+    List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(
+        new InstanceResponseBlock(new DistinctResultsBlock(mock(DistinctAggregationFunction.class),
+            new DistinctTable(schema, Arrays.asList(
+                new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext));
+    LeafStageTransferableBlockOperator operator = new LeafStageTransferableBlockOperator(resultsBlockList, schema);
+
+    // When:
+    TransferableBlock resultBlock = operator.nextBlock();
+
+    // Then:
+    Assert.assertEquals(resultBlock.getContainer().get(0), new Object[]{1, "foo"});
+    Assert.assertEquals(resultBlock.getContainer().get(1), new Object[]{2, "bar"});
+  }
+
+  @Test
+  public void shouldParsedBlocksSuccessfullyWithDistinctQuery() {

Review Comment:
   it is being tested via sql json 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org