You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (Jira)" <ji...@apache.org> on 2020/07/28 07:13:00 UTC

[jira] [Commented] (FLINK-18724) Integration with DataStream and DataSet API report error

    [ https://issues.apache.org/jira/browse/FLINK-18724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166204#comment-17166204 ] 

Timo Walther commented on FLINK-18724:
--------------------------------------

[~dingliang] if you are using `StreamExecutionEnvironment` you can always call `env.execute()`. It seems your job gets submitted, so I guess the problem you are facing right now is more related to your Kafka connection? Btw I would recommend to use the user@ mailing list instead of JIRA. Could you please close this issue? Thanks.

> Integration with DataStream and DataSet API report error 
> ---------------------------------------------------------
>
>                 Key: FLINK-18724
>                 URL: https://issues.apache.org/jira/browse/FLINK-18724
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / API
>    Affects Versions: 1.11.1
>            Reporter: liang ding
>            Priority: Major
>
> I want to create a table from a DataStream(kafka) : there is two reason I need to use DataStream:
>  # I need decode msg to columns by custom format, in sql mode I don't known how to do it.
>  # I have realize DeserializationSchema or FlatMapFunction both. when use datastream I can do many things before it become a suitable table, that is my prefer way in any other apply.
>  so I do it like that:
> {code:java}
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings tSet= EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
>         StreamTableEnvironment tEnv=StreamTableEnvironment.create(env,tSet);
> DataStream<MyRow> stream = env
>                 .addSource(new FlinkKafkaConsumer<>("test-log", new SimpleStringSchema(), properties))
>                 .flatMap(new LogParser());
> //stream.printToErr();
>         tEnv.fromDataStream(stream).select("userId,city").execute().print();
>         tEnv.execute("test-sql");
>         //env.execute("test");
> {code}
> then I got message:
> {noformat}
>  [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: (userId,city) -> to: Row (3/3)] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full fetch response with extra=(test-log-0, response=(
>  [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: (userId,city) -> to: Row (3/3)] INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full fetch response with extra=(test-log-1, response=({noformat}
> it seen like both StreamExecutionEnvironment and StreamTableEnvironment start the fetcher and make no one successed.
> and there is no guide Integration which made me confused: should I do env.execute or 
>  tableEnv.execute or both(it's seen not) ? and the blink planner way



--
This message was sent by Atlassian Jira
(v8.3.4#803005)