You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/30 06:20:49 UTC

[GitHub] [flink] fsk119 commented on a diff in pull request #18920: [FLINK-26361][hive] Create LogicalFilter with CorrelationId to fix "unexpected correlate variable" exception when using Hive dialect

fsk119 commented on code in PR #18920:
URL: https://github.com/apache/flink/pull/18920#discussion_r910647254


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java:
##########
@@ -1862,6 +1867,65 @@ public static RelNode genValues(
                 rows);
     }
 
+    // traverse the given node to find all correlated variables
+    public static Set<CorrelationId> getVariablesSet(RexNode rexNode) {
+        Set<CorrelationId> correlationVariables = new HashSet<>();
+        if (rexNode instanceof RexSubQuery) {
+            RexSubQuery rexSubQuery = (RexSubQuery) rexNode;
+            // we expect correlated variables in Filter only for now.
+            // also check case where operator has o inputs .e.g TableScan
+            if (rexSubQuery.rel.getInputs().isEmpty()) {
+                return correlationVariables;
+            }
+            RelNode input = rexSubQuery.rel.getInput(0);
+            while (input != null
+                    && !(input instanceof LogicalFilter)
+                    && input.getInputs().size() >= 1) {
+                // we don't expect corr vars within UNION for now
+                if (input.getInputs().size() > 1) {
+                    if (input instanceof LogicalJoin) {
+                        correlationVariables.addAll(
+                                findCorrelatedVar(((LogicalJoin) input).getCondition()));
+                    }
+                    return correlationVariables;

Review Comment:
   It's better if we can add the FLINK-XXXX to fix the problem.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java:
##########
@@ -1862,6 +1868,70 @@ public static RelNode genValues(
                 rows);
     }
 
+    /**
+     * traverse the given node to find all correlated variables, the main logic is from {@link
+     * HiveFilter#getVariablesSet()}.
+     */
+    public static Set<CorrelationId> getVariablesSetForFilter(RexNode rexNode) {

Review Comment:
   After reading the code in `SqlToRelConverter#getCorrelationUse`, I find the code is buggy here. We should only add the correlation id on the filter when we can resolve it in the current scope. But the code here will add the correlation id if it is a subquery and its condition contains the correlated variables.  In Calcite, for the query
   ```
   select * from x x1 
   where x1.a in (
     select x2.a from x x2
     where exists (
        select x3.a from x x3 where x3.a = x1.a
     )
   );
   ```
   
   has the RelNode tree:
   ```
   LogicalProject(inputs=[0..1])
   +- LogicalFilter(condition=[IN($0, {
   LogicalProject(a=[$0])
     LogicalFilter(condition=[EXISTS({
   LogicalFilter(condition=[=($0, $cor0.a)])
     LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b)]]])
   })])
       LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b)]]])
   })], variablesSet=[[$cor0]])
      +- LogicalTableScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b)]]])
   ```
   where only the out-most LogicalFilter contains correlation id.
   
   Considering we just align with the Hive behavior, it's fine we don't support this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org