You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jim Chen <ch...@gmail.com> on 2020/07/06 02:19:11 UTC
flink1.10在通过TableFunction实现行转列时,Row一直是空
大家好:
我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
那么在eval方法接收到的就是Row[],
问题出在,Row[]中的数据获取不到,里面的元素都是NULL
通过下面的步骤和代码可还原车祸场景:
kafka topic: test_action
kafka message:
{"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
代码1:Problem.java
package com.flink;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
*
* 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
* 那么在eval方法接收到的就是Row[],
* 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
*
* kafka topic: test_action
*
* kafka message:
* {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
*/
public class Problem {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode2", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable (\n" +
" action ARRAY<\n" +
" ROW<" +
" actionID STRING,\n" +
" actionName STRING\n" +
" >\n" +
" >\n" +
") WITH (\n" +
" 'connector.type' = 'kafka',\n" +
" 'connector.version' = '0.11',\n" +
" 'connector.topic' = 'test_action',\n" +
" 'connector.startup-mode' = 'earliest-offset',\n" +
" 'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
" 'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
" 'update-mode' = 'append',\n" +
" 'format.type' = 'json'\n" +
")";
bsEnv.sqlUpdate(ddlSource);
// Table table = bsEnv.sqlQuery("select `action` from actionTable");
Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
TABLE(explode2(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print("==tb==");
bsEnv.execute("ARRAY tableFunction Problem");
}
}
代码2:ExplodeFunction.java
package com.flink;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Arrays;
public class ExplodeFunction extends TableFunction<Row> {
public void eval(Row[] values) {
System.out.println(values.length);
if (values.length > 0) {
for (Row row : values) {
if (row != null) {// 这里debug出来的row总是空
ArrayList<Object> list = new ArrayList<>();
for (int i = 0; i < row.getArity(); i++) {
Object field = row.getField(i);
list.add(field);
}
collector.collect(Row.of(Arrays.toString(list.toArray())));
}
}
}
}
}
最后贴个debug的图
[image: image.png]
Re: flink1.10在通过TableFunction实现行转列时,Row一直是空
Posted by Jim Chen <ch...@gmail.com>.
Hi,
我现在转换思路,就是在定义表的时候,把ARRYA看成STRING,
那么,现在的问题,就是查询出来,都是空。
基于上面的代码环境,新写了一个类
Problem2.java
package com.flink;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
*
* 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
* 那么在eval方法接收到的就是Row[],
* 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
*
* 现在思路:就是在定义表的时候,把ARRYA看成STRING,
* 现在的问题,就是查询出来,都是空
*
* kafka topic: test_action
*
* kafka message:
* {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
*/
public class Problem2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
bsEnv.registerFunction("explode3", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable3 (\n" +
" action STRING\n" +
") WITH (\n" +
" 'connector.type' = 'kafka',\n" +
" 'connector.version' = '0.11',\n" +
" 'connector.topic' = 'test_action',\n" +
" 'connector.startup-mode' = 'earliest-offset',\n" +
" 'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
" 'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
" 'update-mode' = 'append',\n" +
" 'format.type' = 'json',\n" +
" 'format.derive-schema' = 'false',\n" +
" 'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);
Table table = bsEnv.sqlQuery("select * from actionTable3");
// Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// 输出都是空
bsEnv.execute("ARRAY tableFunction Problem");
}
}
Jark Wu <im...@gmail.com> 于2020年7月6日周一 上午10:36写道:
> Hi,
>
> 当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
> https://issues.apache.org/jira/browse/FLINK-17855
>
>
> Best,
> Jark
>
> On Mon, 6 Jul 2020 at 10:19, Jim Chen <ch...@gmail.com> wrote:
>
> > 大家好:
> > 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> > 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> > 那么在eval方法接收到的就是Row[],
> > 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> >
> > 通过下面的步骤和代码可还原车祸场景:
> > kafka topic: test_action
> > kafka message:
> > {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> >
> > 代码1:Problem.java
> > package com.flink;
> >
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > /**
> > *
> > * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> > * 那么在eval方法接收到的就是Row[],
> > * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> > *
> > * kafka topic: test_action
> > *
> > * kafka message:
> > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> > "id002", "actionName": "bbb"} ] }
> > */
> > public class Problem {
> >
> > public static void main(String[] args) throws Exception {
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build();
> > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> > envSettings);
> > bsEnv.registerFunction("explode2", new ExplodeFunction());
> >
> > String ddlSource = "CREATE TABLE actionTable (\n" +
> > " action ARRAY<\n" +
> > " ROW<" +
> > " actionID STRING,\n" +
> > " actionName STRING\n" +
> > " >\n" +
> > " >\n" +
> > ") WITH (\n" +
> > " 'connector.type' = 'kafka',\n" +
> > " 'connector.version' = '0.11',\n" +
> > " 'connector.topic' = 'test_action',\n" +
> > " 'connector.startup-mode' = 'earliest-offset',\n" +
> > " 'connector.properties.zookeeper.connect' =
> > 'localhost:2181',\n" +
> > " 'connector.properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> > " 'update-mode' = 'append',\n" +
> > " 'format.type' = 'json'\n" +
> > ")";
> > bsEnv.sqlUpdate(ddlSource);
> >
> > // Table table = bsEnv.sqlQuery("select `action` from
> actionTable");
> > Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
> > TABLE(explode2(`action`)) as T(`word`)");
> > table.printSchema();
> > bsEnv.toAppendStream(table, Row.class)
> > .print("==tb==");
> >
> >
> > bsEnv.execute("ARRAY tableFunction Problem");
> > }
> > }
> >
> > 代码2:ExplodeFunction.java
> > package com.flink;
> >
> > import org.apache.flink.table.functions.TableFunction;
> > import org.apache.flink.types.Row;
> >
> > import java.util.ArrayList;
> > import java.util.Arrays;
> >
> > public class ExplodeFunction extends TableFunction<Row> {
> >
> > public void eval(Row[] values) {
> > System.out.println(values.length);
> > if (values.length > 0) {
> > for (Row row : values) {
> > if (row != null) {// 这里debug出来的row总是空
> > ArrayList<Object> list = new ArrayList<>();
> > for (int i = 0; i < row.getArity(); i++) {
> > Object field = row.getField(i);
> > list.add(field);
> > }
> >
> > collector.collect(Row.of(Arrays.toString(list.toArray())));
> > }
> > }
> > }
> > }
> > }
> >
> > 最后贴个debug的图
> > [image: image.png]
> >
>
Re: flink1.10在通过TableFunction实现行转列时,Row一直是空
Posted by Jark Wu <im...@gmail.com>.
Hi,
当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。
https://issues.apache.org/jira/browse/FLINK-17855
Best,
Jark
On Mon, 6 Jul 2020 at 10:19, Jim Chen <ch...@gmail.com> wrote:
> 大家好:
> 我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。
> 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> 那么在eval方法接收到的就是Row[],
> 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
>
> 通过下面的步骤和代码可还原车祸场景:
> kafka topic: test_action
> kafka message:
> {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
>
> 代码1:Problem.java
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
> *
> * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型,
> * 那么在eval方法接收到的就是Row[],
> * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL
> *
> * kafka topic: test_action
> *
> * kafka message:
> * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
> */
> public class Problem {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
> bsEnv.registerFunction("explode2", new ExplodeFunction());
>
> String ddlSource = "CREATE TABLE actionTable (\n" +
> " action ARRAY<\n" +
> " ROW<" +
> " actionID STRING,\n" +
> " actionName STRING\n" +
> " >\n" +
> " >\n" +
> ") WITH (\n" +
> " 'connector.type' = 'kafka',\n" +
> " 'connector.version' = '0.11',\n" +
> " 'connector.topic' = 'test_action',\n" +
> " 'connector.startup-mode' = 'earliest-offset',\n" +
> " 'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> " 'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> " 'update-mode' = 'append',\n" +
> " 'format.type' = 'json'\n" +
> ")";
> bsEnv.sqlUpdate(ddlSource);
>
> // Table table = bsEnv.sqlQuery("select `action` from actionTable");
> Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL
> TABLE(explode2(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print("==tb==");
>
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>
> 代码2:ExplodeFunction.java
> package com.flink;
>
> import org.apache.flink.table.functions.TableFunction;
> import org.apache.flink.types.Row;
>
> import java.util.ArrayList;
> import java.util.Arrays;
>
> public class ExplodeFunction extends TableFunction<Row> {
>
> public void eval(Row[] values) {
> System.out.println(values.length);
> if (values.length > 0) {
> for (Row row : values) {
> if (row != null) {// 这里debug出来的row总是空
> ArrayList<Object> list = new ArrayList<>();
> for (int i = 0; i < row.getArity(); i++) {
> Object field = row.getField(i);
> list.add(field);
> }
>
> collector.collect(Row.of(Arrays.toString(list.toArray())));
> }
> }
> }
> }
> }
>
> 最后贴个debug的图
> [image: image.png]
>