You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/14 02:19:23 UTC
[flink] branch release-1.16 updated: [FLINK-29113][table-planner] Throw exception when finding at least one invalid table name in join hints
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 0e02c082037 [FLINK-29113][table-planner] Throw exception when finding at least one invalid table name in join hints
0e02c082037 is described below
commit 0e02c082037979766b61da90e37f5fc555d71770
Author: xuyang <xy...@163.com>
AuthorDate: Mon Aug 29 18:21:39 2022 +0800
[FLINK-29113][table-planner] Throw exception when finding at least one invalid table name in join hints
This closes #20705
(cherry picked from commit d518086f475ec92a18592ec3c423bf6398e776cf)
---
.../planner/plan/optimize/JoinHintResolver.java | 113 ++++++++++++++++++---
.../planner/plan/hints/batch/JoinHintTestBase.java | 45 +++++++-
.../plan/stream/sql/join/LookupJoinTest.scala | 3 +-
3 files changed, 144 insertions(+), 17 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
index ba11c29c1c9..36af173b1ae 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
@@ -34,10 +34,14 @@ import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
@@ -52,6 +56,9 @@ public class JoinHintResolver extends RelShuttleImpl {
private final Set<RelHint> allHints = new HashSet<>();
private final Set<RelHint> validHints = new HashSet<>();
+ // hintName -> hintOptions -> whether this option has been checked
+ private final Map<String, Map<String, Boolean>> allOptionsInJoinHints = new HashMap<>();
+
/**
* Resolves and validates join hints in the given {@link RelNode} list, an {@link
* ValidationException} will be raised for invalid hints.
@@ -64,7 +71,7 @@ public class JoinHintResolver extends RelShuttleImpl {
* right side of this join, that means this join hint is invalid and a {@link
* ValidationException} will be thrown.
*/
- public List<RelNode> resolve(List<RelNode> roots) {
+ final List<RelNode> resolve(List<RelNode> roots) {
List<RelNode> resolvedRoots =
roots.stream().map(node -> node.accept(this)).collect(Collectors.toList());
validateHints();
@@ -96,13 +103,22 @@ public class JoinHintResolver extends RelShuttleImpl {
Configuration conf = Configuration.fromMap(hint.kvOptions);
// hint option checker has done the validation
String lookupTable = conf.get(LOOKUP_TABLE);
+
+ // add options about this hint for finally checking
+ initOptionInfoAboutJoinHintsForCheck(
+ hint.hintName, Collections.singletonList(lookupTable));
+
assert null != lookupTable;
if (rightName.isPresent() && matchIdentifier(lookupTable, rightName.get())) {
validHints.add(trimInheritPath(hint));
+ updateInfoForOptionCheck(hint.hintName, rightName);
newHints.add(hint);
}
} else if (JoinStrategy.isJoinStrategy(hint.hintName)) {
allHints.add(trimInheritPath(hint));
+ // add options about this hint for finally checking
+ initOptionInfoAboutJoinHintsForCheck(hint.hintName, hint.listOptions);
+
// the declared table name or query block name is replaced by
// JoinStrategy#LEFT_INPUT or JoinStrategy#RIGHT_INPUT
List<String> newOptions =
@@ -143,6 +159,11 @@ public class JoinHintResolver extends RelShuttleImpl {
Optional<String> rightName,
List<String> listOptions,
String hintName) {
+
+ // update info about 'allOptionsInJoinHints' for checking finally
+ updateInfoForOptionCheck(hintName, leftName);
+ updateInfoForOptionCheck(hintName, rightName);
+
return listOptions.stream()
.map(
option -> {
@@ -172,21 +193,57 @@ public class JoinHintResolver extends RelShuttleImpl {
private void validateHints() {
Set<RelHint> invalidHints = new HashSet<>(allHints);
invalidHints.removeAll(validHints);
+ String errorPattern;
+
+ // firstly, check the unknown table (view) names in hints
+ errorPattern =
+ "The options of following hints cannot match the name of "
+ + "input tables or views: %s";
+ StringBuilder errorMsgSb = new StringBuilder();
+ AtomicBoolean containsInvalidOptions = new AtomicBoolean(false);
+ for (String hintName : allOptionsInJoinHints.keySet()) {
+ Map<String, Boolean> optionCheckedStatus = allOptionsInJoinHints.get(hintName);
+ errorMsgSb.append("\n");
+ errorMsgSb.append(
+ String.format(
+ "`%s` in `%s`",
+ optionCheckedStatus.keySet().stream()
+ .filter(
+ op -> {
+ boolean checked = !optionCheckedStatus.get(op);
+ if (checked) {
+ containsInvalidOptions.set(true);
+ }
+ return checked;
+ })
+ .collect(Collectors.joining(", ")),
+ hintName));
+ }
+ if (containsInvalidOptions.get()) {
+ throw new ValidationException(String.format(errorPattern, errorMsgSb));
+ }
+
+ // secondly, check invalid hints.
+ // see more at JoinStrategy#validOptions
if (!invalidHints.isEmpty()) {
+ errorPattern = "The options of following hints is invalid: %s";
String errorMsg =
invalidHints.stream()
.map(
- hint ->
- hint.hintName
- + "("
- + StringUtils.join(hint.listOptions, ", ")
- + ")`")
- .collect(Collectors.joining("\n`", "\n`", ""));
- throw new ValidationException(
- String.format(
- "The options of following hints cannot match the name of "
- + "input tables or views: %s",
- errorMsg));
+ hint -> {
+ String hintName = hint.hintName;
+ if (JoinStrategy.isLookupHint(hintName)) {
+ // lookup join
+ return hint.hintName;
+ } else {
+ // join hint
+ return hint.hintName
+ + " :"
+ + StringUtils.join(hint.listOptions, ", ");
+ }
+ })
+ .collect(Collectors.joining("\n", "\n", ""));
+ throw new ValidationException(String.format(errorPattern, errorMsg));
}
}
@@ -252,4 +309,36 @@ public class JoinHintResolver extends RelShuttleImpl {
return true;
}
+
+ private void initOptionInfoAboutJoinHintsForCheck(String hintName, List<String> definedTables) {
+ if (allOptionsInJoinHints.containsKey(hintName)) {
+ Map<String, Boolean> optionCheckedStatus = allOptionsInJoinHints.get(hintName);
+ definedTables.forEach(
+ table -> {
+ if (!optionCheckedStatus.containsKey(table)) {
+ // all options are not checked when init
+ optionCheckedStatus.put(table, false);
+ }
+ });
+ } else {
+ allOptionsInJoinHints.put(
+ hintName,
+ new HashSet<>(definedTables)
+ .stream()
+ // all options are not checked when init
+ .collect(Collectors.toMap(table -> table, table -> false)));
+ }
+ }
+
+ private void updateInfoForOptionCheck(String hintName, Optional<String> tableName) {
+ if (tableName.isPresent()) {
+ Map<String, Boolean> optionMapper = allOptionsInJoinHints.get(hintName);
+ for (String option : optionMapper.keySet()) {
+ if (matchIdentifier(option, tableName.get())) {
+ // if the hint has not been checked before, update it
+ allOptionsInJoinHints.get(hintName).put(option, true);
+ }
+ }
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
index c20fe3eb2ff..cf569d5e8b7 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
@@ -188,12 +188,50 @@ public abstract class JoinHintTestBase extends TableTestBase {
public void testJoinHintWithUnknownTable() {
thrown().expect(ValidationException.class);
thrown().expectMessage(
- "The options of following hints cannot match the name of input tables or views:");
+ String.format(
+ "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+ "T99", getTestSingleJoinHint()));
String sql = "select /*+ %s(T99) */* from T1 join T2 on T1.a1 = T2.a2";
verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
}
+ @Test
+ public void testJoinHintWithUnknownTableNameMixedWithValidTableNames1() {
+ thrown().expect(ValidationException.class);
+ thrown().expectMessage(
+ String.format(
+ "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+ "T99", getTestSingleJoinHint()));
+ String sql = "select /*+ %s(T1, T99) */* from T1 join T2 on T1.a1 = T2.a2";
+
+ verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
+ }
+
+ @Test
+ public void testJoinHintWithUnknownTableNameMixedWithValidTableNames2() {
+ thrown().expect(ValidationException.class);
+ thrown().expectMessage(
+ String.format(
+ "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+ "T99", getTestSingleJoinHint()));
+ String sql = "select /*+ %s(T1, T99, T2) */* from T1 join T2 on T1.a1 = T2.a2";
+
+ verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
+ }
+
+ @Test
+ public void testJoinHintWithMultiUnknownTableNamesMixedWithValidTableNames() {
+ thrown().expect(ValidationException.class);
+ thrown().expectMessage(
+ String.format(
+ "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+ "T98, T99", getTestSingleJoinHint()));
+ String sql = "select /*+ %s(T1, T99, T98) */* from T1 join T2 on T1.a1 = T2.a2";
+
+ verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
+ }
+
@Test
public void testJoinHintWithView() {
String sql = "select /*+ %s(V4) */* from T1 join V4 on T1.a1 = V4.a4";
@@ -206,9 +244,8 @@ public abstract class JoinHintTestBase extends TableTestBase {
thrown().expect(ValidationException.class);
thrown().expectMessage(
String.format(
- "The options of following hints cannot match the name of input tables or views: \n"
- + "`%s(V99)`",
- getTestSingleJoinHint()));
+ "The options of following hints cannot match the name of input tables or views: \n`%s` in `%s`",
+ "V99", getTestSingleJoinHint()));
String sql = "select /*+ %s(V99) */* from T1 join V4 on T1.a1 = V4.a4";
verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 469b74fa287..e60256580ec 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -752,7 +752,8 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
def testJoinHintWithTableAlias(): Unit = {
// TODO to be supported in FLINK-28850 (to make LogicalSnapshot Hintable)
thrown.expectMessage(
- "The options of following hints cannot match the name of input tables or views")
+ "The options of following hints cannot match the name of input tables or" +
+ " views: \n`D` in `LOOKUP`")
thrown.expect(classOf[ValidationException])
val sql = "SELECT /*+ LOOKUP('table'='D') */ * FROM MyTable AS T JOIN LookupTable " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"