You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/04 08:43:20 UTC
[flink] branch release-1.10 updated:
[FLINK-15658][table-planner-blink] Fix duplicate field names exception when
join on the same key multiple times (#11011)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new c84b754 [FLINK-15658][table-planner-blink] Fix duplicate field names exception when join on the same key multiple times (#11011)
c84b754 is described below
commit c84b754b60e62f106adda47e91bfeec5ae5edeb5
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Feb 4 16:39:01 2020 +0800
[FLINK-15658][table-planner-blink] Fix duplicate field names exception when join on the same key multiple times (#11011)
---
.../flink/table/planner/plan/utils/KeySelectorUtil.java | 12 ++++++------
.../flink/table/planner/runtime/stream/sql/JoinITCase.scala | 12 ++++++++++++
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
index 5911abe..76934bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
@@ -44,20 +44,20 @@ public class KeySelectorUtil {
public static BaseRowKeySelector getBaseRowSelector(int[] keyFields, BaseRowTypeInfo rowType) {
if (keyFields.length > 0) {
LogicalType[] inputFieldTypes = rowType.getLogicalTypes();
- String[] inputFieldNames = rowType.getFieldNames();
LogicalType[] keyFieldTypes = new LogicalType[keyFields.length];
- String[] keyFieldNames = new String[keyFields.length];
for (int i = 0; i < keyFields.length; ++i) {
keyFieldTypes[i] = inputFieldTypes[keyFields[i]];
- keyFieldNames[i] = inputFieldNames[keyFields[i]];
}
- RowType returnType = RowType.of(keyFieldTypes, keyFieldNames);
- RowType inputType = RowType.of(inputFieldTypes, rowType.getFieldNames());
+ // do not provide field names for the result key type,
+ // because we may have duplicate key fields and the field names may conflict
+ RowType returnType = RowType.of(keyFieldTypes);
+ RowType inputType = rowType.toRowType();
GeneratedProjection generatedProjection = ProjectionCodeGenerator.generateProjection(
CodeGeneratorContext.apply(new TableConfig()),
"KeyProjection",
inputType,
- returnType, keyFields);
+ returnType,
+ keyFields);
BaseRowTypeInfo keyRowType = BaseRowTypeInfo.of(returnType);
return new BinaryRowKeySelector(keyRowType, generatedProjection);
} else {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index f2a3131..36e8a45 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -232,6 +232,18 @@ class JoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(sta
}
@Test
+ def testInnerJoinWithDuplicateKey(): Unit = {
+ val query = "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 AND a1 = b3"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq("2,2,2", "3,3,3")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
+ @Test
def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"