You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/10 18:33:06 UTC

[GitHub] [pinot] agavra commented on a diff in pull request #9775: [multistage] Join operator unit test and some failed integration tests (commented out)

agavra commented on code in PR #9775:
URL: https://github.com/apache/pinot/pull/9775#discussion_r1019483576


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java:
##########
@@ -22,18 +22,45 @@
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
-
 public class HashJoinOperatorTest {
+  private AutoCloseable _mocks;

Review Comment:
   can we add tests for handling upstream NOOP, EOS (with no other rows, e.g. one side is empty) and ERROR?



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java:
##########
@@ -42,80 +69,323 @@ private static JoinNode.JoinKeys getJoinKeys(List<Integer> leftIdx, List<Integer
 
   @Test
   public void testHashJoinKeyCollisionInnerJoin() {
-    BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
-    BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
-    List<RexExpression> joinClauses = new ArrayList<>();
-    DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
-        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
-        DataSchema.ColumnDataType.STRING
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
+    List<RexExpression> joinClauses = new ArrayList<>();
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
         getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
 
-    TransferableBlock result = join.nextBlock();
+    TransferableBlock result = joinOnString.nextBlock();
     while (result.isNoOpBlock()) {
-      result = join.nextBlock();
+      result = joinOnString.nextBlock();
     }
     List<Object[]> resultRows = result.getContainer();
     List<Object[]> expectedRows =
-        Arrays.asList(new Object[]{1, "Aa", 1, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"},
-            new Object[]{3, "BB", 2, "BB"}, new Object[]{3, "BB", 3, "BB"});
-    Assert.assertEquals(expectedRows.size(), resultRows.size());
-    Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
-    Assert.assertEquals(expectedRows.get(1), resultRows.get(1));
-    Assert.assertEquals(expectedRows.get(2), resultRows.get(2));
-    Assert.assertEquals(expectedRows.get(3), resultRows.get(3));
-    Assert.assertEquals(expectedRows.get(4), resultRows.get(4));
+        Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
   }
 
   @Test
-  public void testInnerJoin() {
-    BaseOperator<TransferableBlock> leftOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1);
-    BaseOperator<TransferableBlock> rightOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_2);
+  public void testInnerJoinOnInt() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_co2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER);
+    TransferableBlock result = joinOnInt.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnInt.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{2, "BB", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+  }
 
+  @Test
+  public void testJoinOnEmptySelector() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     List<RexExpression> joinClauses = new ArrayList<>();
-    DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
-        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
-        DataSchema.ColumnDataType.STRING
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_co2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, JoinRelType.INNER);
+    TransferableBlock result = joinOnInt.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnInt.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows =
+        Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{1, "Aa", 2, "BB"}, new Object[]{1, "Aa", 3, "BB"},
+            new Object[]{2, "BB", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"}, new Object[]{2, "BB", 3, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
+    Assert.assertEquals(resultRows.get(3), expectedRows.get(3));
+    Assert.assertEquals(resultRows.get(4), expectedRows.get(4));
+    Assert.assertEquals(resultRows.get(5), expectedRows.get(5));
+  }
+
+  @Test
+  public void testLeftJoin() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(leftOperator, rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
+    DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "CC"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    Mockito.when(_rightOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    List<RexExpression> joinClauses = new ArrayList<>();
+    DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"},
+        new DataSchema.ColumnDataType[]{
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
+            DataSchema.ColumnDataType.STRING
+        });
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
     }
     List<Object[]> resultRows = result.getContainer();
-    Object[] expRow = new Object[]{1, "Aa", 2, "Aa"};
-    List<Object[]> expectedRows = new ArrayList<>();
-    expectedRows.add(expRow);
-    Assert.assertEquals(expectedRows.size(), resultRows.size());
-    Assert.assertEquals(expectedRows.get(0), resultRows.get(0));
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{1, "Aa", 2, "Aa"}, new Object[]{2, "CC", null, null});

Review Comment:
   can we also add a test for 1:N left join? (e.g. one record on the left joins with multiple on the right)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org