You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/04 08:43:20 UTC

[flink] branch release-1.10 updated: [FLINK-15658][table-planner-blink] Fix duplicate field names exception when join on the same key multiple times (#11011)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new c84b754  [FLINK-15658][table-planner-blink] Fix duplicate field names exception when join on the same key multiple times (#11011)
c84b754 is described below

commit c84b754b60e62f106adda47e91bfeec5ae5edeb5
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Feb 4 16:39:01 2020 +0800

    [FLINK-15658][table-planner-blink] Fix duplicate field names exception when join on the same key multiple times (#11011)
---
 .../flink/table/planner/plan/utils/KeySelectorUtil.java      | 12 ++++++------
 .../flink/table/planner/runtime/stream/sql/JoinITCase.scala  | 12 ++++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
index 5911abe..76934bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/KeySelectorUtil.java
@@ -44,20 +44,20 @@ public class KeySelectorUtil {
 	public static BaseRowKeySelector getBaseRowSelector(int[] keyFields, BaseRowTypeInfo rowType) {
 		if (keyFields.length > 0) {
 			LogicalType[] inputFieldTypes = rowType.getLogicalTypes();
-			String[] inputFieldNames = rowType.getFieldNames();
 			LogicalType[] keyFieldTypes = new LogicalType[keyFields.length];
-			String[] keyFieldNames = new String[keyFields.length];
 			for (int i = 0; i < keyFields.length; ++i) {
 				keyFieldTypes[i] = inputFieldTypes[keyFields[i]];
-				keyFieldNames[i] = inputFieldNames[keyFields[i]];
 			}
-			RowType returnType = RowType.of(keyFieldTypes, keyFieldNames);
-			RowType inputType = RowType.of(inputFieldTypes, rowType.getFieldNames());
+			// do not provide field names for the result key type,
+			// because we may have duplicate key fields and the field names may conflict
+			RowType returnType = RowType.of(keyFieldTypes);
+			RowType inputType = rowType.toRowType();
 			GeneratedProjection generatedProjection = ProjectionCodeGenerator.generateProjection(
 				CodeGeneratorContext.apply(new TableConfig()),
 				"KeyProjection",
 				inputType,
-				returnType, keyFields);
+				returnType,
+				keyFields);
 			BaseRowTypeInfo keyRowType = BaseRowTypeInfo.of(returnType);
 			return new BinaryRowKeySelector(keyRowType, generatedProjection);
 		} else {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index f2a3131..36e8a45 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -232,6 +232,18 @@ class JoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(sta
   }
 
   @Test
+  def testInnerJoinWithDuplicateKey(): Unit = {
+    val query = "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 AND a1 = b3"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("2,2,2", "3,3,3")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
   def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"