You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/06/29 18:18:52 UTC

[GitHub] [samza] b-slim commented on a change in pull request #1393: SAMZA-2554: Fix the handling of join condition against remote table

b-slim commented on a change in pull request #1393:
URL: https://github.com/apache/samza/pull/1393#discussion_r447163250



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
##########
@@ -193,88 +215,133 @@ private void validateJoinQuery(LogicalJoin join, JoinInputNode.InputType inputTy
           dumpRelPlanForNode(join));
     }
 
-    if (joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnLeft && !isTablePosOnRight) {
+    if (joinRelType.compareTo(JoinRelType.LEFT) == 0 && isTablePosOnLeft) {
       throw new SamzaException("Invalid query for outer left join. Left side of the join should be a 'stream' and "
           + "right side of join should be a 'table'. " + dumpRelPlanForNode(join));
     }
 
-    if (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && isTablePosOnRight && !isTablePosOnLeft) {
+    if (joinRelType.compareTo(JoinRelType.RIGHT) == 0 && isTablePosOnRight) {
       throw new SamzaException("Invalid query for outer right join. Left side of the join should be a 'table' and "
           + "right side of join should be a 'stream'. " + dumpRelPlanForNode(join));
     }
 
-    validateJoinCondition(join.getCondition());
-  }
-
-  private void validateJoinCondition(RexNode operand) {
-    if (!(operand instanceof RexCall)) {
-      throw new SamzaException("SQL Query is not supported. Join condition operand " + operand +
-          " is of type " + operand.getClass());
-    }
-
-    RexCall condition = (RexCall) operand;
+    final List<RexNode> conjunctionList = new ArrayList<>();
+    decomposeAndValidateConjunction(join.getCondition(), conjunctionList);
 
-    if (condition.isAlwaysTrue()) {
+    if (conjunctionList.isEmpty()) {
       throw new SamzaException("Query results in a cross join, which is not supported. Please optimize the query."
           + " It is expected that the joins should include JOIN ON operator in the sql query.");
     }
-
-    if (condition.getKind() != SqlKind.EQUALS && condition.getKind() != SqlKind.AND) {
-      throw new SamzaException("Only equi-joins and AND operator is supported in join condition.");
-    }
-  }
-
-  // Fetch the stream and table key indices corresponding to the fields given in the join condition by parsing through
-  // the condition. Stream and table key indices are populated in streamKeyIds and tableKeyIds respectively.
-  private void populateStreamAndTableKeyIds(List<RexNode> operands, final LogicalJoin join, boolean isTablePosOnRight,
-      List<Integer> streamKeyIds, List<Integer> tableKeyIds) {
-
-    // All non-leaf operands in the join condition should be expressions.
-    if (operands.get(0) instanceof RexCall) {
-      operands.forEach(operand -> {
-        validateJoinCondition(operand);
-        populateStreamAndTableKeyIds(((RexCall) operand).getOperands(), join, isTablePosOnRight, streamKeyIds, tableKeyIds);
-      });
+    //TODO Not sure why we can not allow literal as part of the join condition will revisit this in another scope
+    conjunctionList.forEach(rexNode -> rexNode.accept(new RexShuttle(){
+      @Override
+      public RexNode visitLiteral(RexLiteral literal) {
+        throw new SamzaException(
+            "Join Condition can not allow literal " + literal.toString() + " join node" + join.getDigest());
+      }
+    }));
+    final JoinInputNode.InputType rootTableInput = isTablePosOnRight ? inputTypeOnRight : inputTypeOnLeft;
+    if (rootTableInput.compareTo(JoinInputNode.InputType.REMOTE_TABLE) != 0) {
+      // it is not a remote table all is good we do not have to validate the project on key Column
       return;
     }
 
-    // We are at the leaf of the join condition. Only binary operators are supported.
-    Validate.isTrue(operands.size() == 2);
+    /*
+    For remote Table we need to validate The join Condition and The project that is above remote table scan.
+     - As of today Filter need to be exactly one equi-join using the __key__ column (see SAMZA-2554)
+     - The Project on the top of the remote table has to contain only simple input references to any of the column used in the join.
+    */
+
+    // First let's collect the ref of columns used by the join condition.
+    List<RexInputRef> refCollector = new ArrayList<>();
+    join.getCondition().accept(new RexShuttle(){
+      @Override
+      public RexNode visitInputRef(RexInputRef inputRef) {
+        refCollector.add(inputRef);
+        return inputRef;
+      }
+    });
+    // start index of the Remote table within the Join Row
+    final int tableStartIndex = isTablePosOnRight ? join.getLeft().getRowType().getFieldCount() : 0;
+    // end index of the Remote table withing the Join Row
+    final int tableEndIndex =
+        isTablePosOnRight ? join.getRowType().getFieldCount() : join.getLeft().getRowType().getFieldCount();
+
+    List<Integer> tableRefsIdx = refCollector.stream()
+        .map(x -> x.getIndex())
+        .filter(x -> tableStartIndex <= x && x < tableEndIndex) // collect all the refs form table side
+        .map(x -> x - tableStartIndex) // re-adjust the offset
+        .sorted()
+        .collect(Collectors.toList()); // we have a list with all the input from table side with 0 based index.
+
+    // Validate the Condition must contain a ref to remote table primary key column.
+
+    if (conjunctionList.size() != 1 || tableRefsIdx.size() != 1) {
+      //TODO We can relax this by allowing another filter to be evaluated post lookup see SAMZA-2554
+      throw new SamzaException(
+          "Invalid query for join condition must contain exactly one predicate for remote table on __key__ column "
+              + dumpRelPlanForNode(join));
+    }
 
-    // Only reference operands are supported in row expressions and not constants.
-    // a.key = b.key is supported with a.key and b.key being reference operands.
-    // a.key = "constant" is not yet supported.
-    if (!(operands.get(0) instanceof RexInputRef) || !(operands.get(1) instanceof RexInputRef)) {
-      throw new SamzaException("SQL query is not supported. Join condition " + join.getCondition() + " should have "
-          + "reference operands but the types are " + operands.get(0).getClass() + " and " + operands.get(1).getClass());
+    // Validate the Project, follow each input and ensure that it is a simple ref with no rexCall in the way.
+    if (!isValidRemoteJoinRef(tableRefsIdx.get(0), isTablePosOnRight ? join.getRight() : join.getLeft())) {
+      throw new SamzaException("Invalid query for join condition can not have an expression and must be reference "
+          + SamzaSqlRelMessage.KEY_NAME + " column " + dumpRelPlanForNode(join));
     }
+  }
 
-    // Join condition is commutative, meaning, a.key = b.key is equivalent to b.key = a.key.
-    // Calcite assigns the indices to the fields based on the order a and b are specified in
-    // the sql 'from' clause. Let's put the operand with smaller index in leftRef and larger
-    // index in rightRef so that the order of operands in the join condition is in the order
-    // the stream and table are specified in the 'from' clause.
+  /**
+   * Helper method to check if the join condition can be evaluated by the remote table.
+   * It does follow single path  using the index ref path checking if it is a simple reference all the way to table scan.
+   * In case any RexCall is encountered will stop an return null as a marker otherwise will return Column Name.
+   *
+   * @param inputRexIndex rex ref index
+   * @param relNode current Rel Node
+   * @return false if any Relational Expression is encountered on the path, true if is simple ref to __key__ column.
+   */
+  private static boolean isValidRemoteJoinRef(int inputRexIndex, RelNode relNode){
+    if (relNode instanceof TableScan) {
+      return relNode.getRowType().getFieldList().get(inputRexIndex).getName().equals(SamzaSqlRelMessage.KEY_NAME);
+    }
+    // has to be a single rel kind filter/project/table scan

Review comment:
       @atoomula  this is hard to get done here because we can not handle any operator between TS and Join yet aka the NPE will come first, thus will add test for this case later. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org