You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jim Chen <ch...@gmail.com> on 2020/07/06 05:28:10 UTC
HELP ! ! ! When using Flink1.10 to define table with the string type,
the query result is null
Hi, everyone!
When i use flink1.10 to define table, and i want to define the json array
as the string type. But the query resutl is null when i execute the program.
The detail code as follow:
package com.flink;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* kafka topic: test_action
*
* kafka message:
* {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
"id002", "actionName": "bbb"} ] }
*/
public class Problem2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
envSettings);
// bsEnv.registerFunction("explode3", new ExplodeFunction());
String ddlSource = "CREATE TABLE actionTable3 (\n" +
" action STRING\n" +
") WITH (\n" +
" 'connector.type' = 'kafka',\n" +
" 'connector.version' = '0.11',\n" +
" 'connector.topic' = 'test_action',\n" +
" 'connector.startup-mode' = 'earliest-offset',\n" +
" 'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
" 'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
" 'update-mode' = 'append',\n" +
" 'format.type' = 'json',\n" +
// " 'format.derive-schema' = 'true',\n" +
" 'format.json-schema' = '{\"type\": \"object\",
\"properties\": {\"action\": {\"type\": \"string\"} } }'" +
")";
System.out.println(ddlSource);
bsEnv.sqlUpdate(ddlSource);
Table table = bsEnv.sqlQuery("select * from actionTable3");
// Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL
TABLE(explode3(`action`)) as T(`word`)");
table.printSchema();
bsEnv.toAppendStream(table, Row.class)
.print();// the result is null
bsEnv.execute("ARRAY tableFunction Problem");
}
}
Re: HELP ! ! ! When using Flink1.10 to define table with the string
type, the query result is null
Posted by Benchao Li <li...@apache.org>.
Hi Jim,
This is a known issue[1], could you verify that if this issue meets your
requirements?
[1] https://issues.apache.org/jira/browse/FLINK-18002
Jim Chen <ch...@gmail.com> 于2020年7月6日周一 下午1:28写道:
> Hi, everyone!
>
> When i use flink1.10 to define table, and i want to define the json array
> as the string type. But the query resutl is null when i execute the program.
> The detail code as follow:
>
> package com.flink;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> /**
> * kafka topic: test_action
> *
> * kafka message:
> * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID":
> "id002", "actionName": "bbb"} ] }
> */
> public class Problem2 {
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env,
> envSettings);
> // bsEnv.registerFunction("explode3", new ExplodeFunction());
>
> String ddlSource = "CREATE TABLE actionTable3 (\n" +
> " action STRING\n" +
> ") WITH (\n" +
> " 'connector.type' = 'kafka',\n" +
> " 'connector.version' = '0.11',\n" +
> " 'connector.topic' = 'test_action',\n" +
> " 'connector.startup-mode' = 'earliest-offset',\n" +
> " 'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
> " 'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
> " 'update-mode' = 'append',\n" +
> " 'format.type' = 'json',\n" +
> // " 'format.derive-schema' = 'true',\n" +
> " 'format.json-schema' = '{\"type\": \"object\",
> \"properties\": {\"action\": {\"type\": \"string\"} } }'" +
> ")";
> System.out.println(ddlSource);
> bsEnv.sqlUpdate(ddlSource);
>
> Table table = bsEnv.sqlQuery("select * from actionTable3");
> // Table table = bsEnv.sqlQuery("select * from actionTable2,
> LATERAL TABLE(explode3(`action`)) as T(`word`)");
> table.printSchema();
> bsEnv.toAppendStream(table, Row.class)
> .print();// the result is null
>
> bsEnv.execute("ARRAY tableFunction Problem");
> }
> }
>
--
Best,
Benchao Li