You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/14 02:19:23 UTC

[flink] branch release-1.16 updated: [FLINK-29113][table-planner] Throw exception when finding at least one invalid table name in join hints

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 0e02c082037 [FLINK-29113][table-planner] Throw exception when finding at least one invalid table name in join hints
0e02c082037 is described below

commit 0e02c082037979766b61da90e37f5fc555d71770
Author: xuyang <xy...@163.com>
AuthorDate: Mon Aug 29 18:21:39 2022 +0800

    [FLINK-29113][table-planner] Throw exception when finding at least one invalid table name in join hints
    
    This closes #20705
    
    (cherry picked from commit d518086f475ec92a18592ec3c423bf6398e776cf)
---
 .../planner/plan/optimize/JoinHintResolver.java    | 113 ++++++++++++++++++---
 .../planner/plan/hints/batch/JoinHintTestBase.java |  45 +++++++-
 .../plan/stream/sql/join/LookupJoinTest.scala      |   3 +-
 3 files changed, 144 insertions(+), 17 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
index ba11c29c1c9..36af173b1ae 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
@@ -34,10 +34,14 @@ import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
@@ -52,6 +56,9 @@ public class JoinHintResolver extends RelShuttleImpl {
     private final Set<RelHint> allHints = new HashSet<>();
     private final Set<RelHint> validHints = new HashSet<>();
 
+    // hintName -> hintOptions -> whether this option has been checked
+    private final Map<String, Map<String, Boolean>> allOptionsInJoinHints = new HashMap<>();
+
     /**
      * Resolves and validates join hints in the given {@link RelNode} list, an {@link
      * ValidationException} will be raised for invalid hints.
@@ -64,7 +71,7 @@ public class JoinHintResolver extends RelShuttleImpl {
      * right side of this join, that means this join hint is invalid and a {@link
      * ValidationException} will be thrown.
      */
-    public List<RelNode> resolve(List<RelNode> roots) {
+    final List<RelNode> resolve(List<RelNode> roots) {
         List<RelNode> resolvedRoots =
                 roots.stream().map(node -> node.accept(this)).collect(Collectors.toList());
         validateHints();
@@ -96,13 +103,22 @@ public class JoinHintResolver extends RelShuttleImpl {
                 Configuration conf = Configuration.fromMap(hint.kvOptions);
                 // hint option checker has done the validation
                 String lookupTable = conf.get(LOOKUP_TABLE);
+
+                // add options about this hint for finally checking
+                initOptionInfoAboutJoinHintsForCheck(
+                        hint.hintName, Collections.singletonList(lookupTable));
+
                 assert null != lookupTable;
                 if (rightName.isPresent() && matchIdentifier(lookupTable, rightName.get())) {
                     validHints.add(trimInheritPath(hint));
+                    updateInfoForOptionCheck(hint.hintName, rightName);
                     newHints.add(hint);
                 }
             } else if (JoinStrategy.isJoinStrategy(hint.hintName)) {
                 allHints.add(trimInheritPath(hint));
+                // add options about this hint for finally checking
+                initOptionInfoAboutJoinHintsForCheck(hint.hintName, hint.listOptions);
+
                 // the declared table name or query block name is replaced by
                 // JoinStrategy#LEFT_INPUT or JoinStrategy#RIGHT_INPUT
                 List<String> newOptions =
@@ -143,6 +159,11 @@ public class JoinHintResolver extends RelShuttleImpl {
             Optional<String> rightName,
             List<String> listOptions,
             String hintName) {
+
+        // update info about 'allOptionsInJoinHints' for checking finally
+        updateInfoForOptionCheck(hintName, leftName);
+        updateInfoForOptionCheck(hintName, rightName);
+
         return listOptions.stream()
                 .map(
                         option -> {
@@ -172,21 +193,57 @@ public class JoinHintResolver extends RelShuttleImpl {
     private void validateHints() {
         Set<RelHint> invalidHints = new HashSet<>(allHints);
         invalidHints.removeAll(validHints);
+        String errorPattern;
+
+        // firstly, check the unknown table (view) names in hints
+        errorPattern =
+                "The options of following hints cannot match the name of "
+                        + "input tables or views: %s";
+        StringBuilder errorMsgSb = new StringBuilder();
+        AtomicBoolean containsInvalidOptions = new AtomicBoolean(false);
+        for (String hintName : allOptionsInJoinHints.keySet()) {
+            Map<String, Boolean> optionCheckedStatus = allOptionsInJoinHints.get(hintName);
+            errorMsgSb.append("\n");
+            errorMsgSb.append(
+                    String.format(
+                            "`%s` in `%s`",
+                            optionCheckedStatus.keySet().stream()
+                                    .filter(
+                                            op -> {
+                                                boolean checked = !optionCheckedStatus.get(op);
+                                                if (checked) {
+                                                    containsInvalidOptions.set(true);
+                                                }
+                                                return checked;
+                                            })
+                                    .collect(Collectors.joining(", ")),
+                            hintName));
+        }
+        if (containsInvalidOptions.get()) {
+            throw new ValidationException(String.format(errorPattern, errorMsgSb));
+        }
+
+        // secondly, check invalid hints.
+        // see more at JoinStrategy#validOptions
         if (!invalidHints.isEmpty()) {
+            errorPattern = "The options of following hints is invalid: %s";
             String errorMsg =
                     invalidHints.stream()
                             .map(
-                                    hint ->
-                                            hint.hintName
-                                                    + "("
-                                                    + StringUtils.join(hint.listOptions, ", ")
-                                                    + ")`")
-                            .collect(Collectors.joining("\n`", "\n`", ""));
-            throw new ValidationException(
-                    String.format(
-                            "The options of following hints cannot match the name of "
-                                    + "input tables or views: %s",
-                            errorMsg));
+                                    hint -> {
+                                        String hintName = hint.hintName;
+                                        if (JoinStrategy.isLookupHint(hintName)) {
+                                            // lookup join
+                                            return hint.hintName;
+                                        } else {
+                                            // join hint
+                                            return hint.hintName
+                                                    + " :"
+                                                    + StringUtils.join(hint.listOptions, ", ");
+                                        }
+                                    })
+                            .collect(Collectors.joining("\n", "\n", ""));
+            throw new ValidationException(String.format(errorPattern, errorMsg));
         }
     }
 
@@ -252,4 +309,36 @@ public class JoinHintResolver extends RelShuttleImpl {
 
         return true;
     }
+
+    private void initOptionInfoAboutJoinHintsForCheck(String hintName, List<String> definedTables) {
+        if (allOptionsInJoinHints.containsKey(hintName)) {
+            Map<String, Boolean> optionCheckedStatus = allOptionsInJoinHints.get(hintName);
+            definedTables.forEach(
+                    table -> {
+                        if (!optionCheckedStatus.containsKey(table)) {
+                            // all options are not checked when init
+                            optionCheckedStatus.put(table, false);
+                        }
+                    });
+        } else {
+            allOptionsInJoinHints.put(
+                    hintName,
+                    new HashSet<>(definedTables)
+                            .stream()
+                                    // all options are not checked when init
+                                    .collect(Collectors.toMap(table -> table, table -> false)));
+        }
+    }
+
+    private void updateInfoForOptionCheck(String hintName, Optional<String> tableName) {
+        if (tableName.isPresent()) {
+            Map<String, Boolean> optionMapper = allOptionsInJoinHints.get(hintName);
+            for (String option : optionMapper.keySet()) {
+                if (matchIdentifier(option, tableName.get())) {
+                    // if the hint has not been checked before, update it
+                    allOptionsInJoinHints.get(hintName).put(option, true);
+                }
+            }
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
index c20fe3eb2ff..cf569d5e8b7 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
@@ -188,12 +188,50 @@ public abstract class JoinHintTestBase extends TableTestBase {
     public void testJoinHintWithUnknownTable() {
         thrown().expect(ValidationException.class);
         thrown().expectMessage(
-                        "The options of following hints cannot match the name of input tables or views:");
+                        String.format(
+                                "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+                                "T99", getTestSingleJoinHint()));
         String sql = "select /*+ %s(T99) */* from T1 join T2 on T1.a1 = T2.a2";
 
         verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
     }
 
+    @Test
+    public void testJoinHintWithUnknownTableNameMixedWithValidTableNames1() {
+        thrown().expect(ValidationException.class);
+        thrown().expectMessage(
+                        String.format(
+                                "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+                                "T99", getTestSingleJoinHint()));
+        String sql = "select /*+ %s(T1, T99) */* from T1 join T2 on T1.a1 = T2.a2";
+
+        verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
+    }
+
+    @Test
+    public void testJoinHintWithUnknownTableNameMixedWithValidTableNames2() {
+        thrown().expect(ValidationException.class);
+        thrown().expectMessage(
+                        String.format(
+                                "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+                                "T99", getTestSingleJoinHint()));
+        String sql = "select /*+ %s(T1, T99, T2) */* from T1 join T2 on T1.a1 = T2.a2";
+
+        verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
+    }
+
+    @Test
+    public void testJoinHintWithMultiUnknownTableNamesMixedWithValidTableNames() {
+        thrown().expect(ValidationException.class);
+        thrown().expectMessage(
+                        String.format(
+                                "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+                                "T98, T99", getTestSingleJoinHint()));
+        String sql = "select /*+ %s(T1, T99, T98) */* from T1 join T2 on T1.a1 = T2.a2";
+
+        verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
+    }
+
     @Test
     public void testJoinHintWithView() {
         String sql = "select /*+ %s(V4) */* from T1 join V4 on T1.a1 = V4.a4";
@@ -206,9 +244,8 @@ public abstract class JoinHintTestBase extends TableTestBase {
         thrown().expect(ValidationException.class);
         thrown().expectMessage(
                         String.format(
-                                "The options of following hints cannot match the name of input tables or views: \n"
-                                        + "`%s(V99)`",
-                                getTestSingleJoinHint()));
+                                "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+                                "V99", getTestSingleJoinHint()));
         String sql = "select /*+ %s(V99) */* from T1 join V4 on T1.a1 = V4.a4";
 
         verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 469b74fa287..e60256580ec 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -752,7 +752,8 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
   def testJoinHintWithTableAlias(): Unit = {
     // TODO to be supported in FLINK-28850 (to make LogicalSnapshot Hintable)
     thrown.expectMessage(
-      "The options of following hints cannot match the name of input tables or views")
+      "The options of following hints cannot match the name of input tables or" +
+        " views: \n`D` in `LOOKUP`")
     thrown.expect(classOf[ValidationException])
     val sql = "SELECT /*+ LOOKUP('table'='D') */ * FROM MyTable AS T JOIN LookupTable " +
       "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"