You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/03/11 03:57:00 UTC
[jira] [Created] (FLINK-21725) DataTypeExtractor extracts wrong
fields ordering for Tuple12
Jark Wu created FLINK-21725:
-------------------------------
Summary: DataTypeExtractor extracts wrong fields ordering for Tuple12
Key: FLINK-21725
URL: https://issues.apache.org/jira/browse/FLINK-21725
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.12.2, 1.11.3, 1.13.0
Reporter: Jark Wu
The following test can reproduce the problem:
{code:java}
/** Emit Tuple12 result. */
public static class JavaTableFuncTuple12
extends TableFunction<
Tuple12<
String,
String,
String,
String,
String,
String,
Integer,
Integer,
Integer,
Integer,
Integer,
Integer>> {
private static final long serialVersionUID = -8258882510989374448L;
public void eval(String str) {
collect(
Tuple12.of(
str + "_a",
str + "_b",
str + "_c",
str + "_d",
str + "_e",
str + "_f",
str.length(),
str.length() + 1,
str.length() + 2,
str.length() + 3,
str.length() + 4,
str.length() + 5));
}
}
{code}
{code:scala}
@Test
def testCorrelateTuple12(): Unit = {
val util = streamTestUtil()
util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
val function = new JavaTableFuncTuple12
util.addTemporarySystemFunction("func1", function)
val sql =
"""
|SELECT *
|FROM MyTable, LATERAL TABLE(func1(c)) AS T
|""".stripMargin
util.verifyExecPlan(sql)
}
{code}
{code}
// output plan
Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,f0,f1,f10,f11,f2,f3,f4,f5,f6,f7,f8,f9], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, INTEGER f10, INTEGER f11, VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
Note that there is no problem if using the legacy {{tEnv.registerFunction}} to register function, becuase it uses {{TypeInformation}}. However, it has problem if using {{tEnv.createTemporaryFunction}} or {{CREATE FUNCTION}} syntax, because it uses {{TypeInference}}.
Note this problem exists in latest 1.11, 1.12, and master branch.
I think the problem might lay in this line: https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L562
because it orders field names by alphabetical.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)