You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "initsun (Jira)" <ji...@apache.org> on 2020/07/04 04:30:00 UTC

[jira] [Comment Edited] (FLINK-18481) Kafka connector can't select data

    [ https://issues.apache.org/jira/browse/FLINK-18481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17151166#comment-17151166 ] 

initsun edited comment on FLINK-18481 at 7/4/20, 4:29 AM:
----------------------------------------------------------

{quote}StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
 String createA = "CREATE TABLE MyUserTable (\n" +
 " t1 STRING,\n" +
 " t2 INT\n" +
 ") WITH (\n" +
 " 'connector.type' = 'kafka', \n" +
 " 'connector.version' = '0.11',\n" +
 " 'connector.topic' = 'csvtb', \n" +
 " 'connector.properties.bootstrap.servers' = 'localhost:9092', \n" +
 " 'connector.startup-mode' = 'earliest-offset', \n" +
 " 'format.type' = 'csv'\n" +
 ")\n";
 bsTableEnv.executeSql(createA);
 TableResult insert = bsTableEnv.executeSql("INSERT INTO MyUserTable VALUES('test111',2)");
 insert.print();
 Thread.sleep(10000);
 TableResult tableResult = bsTableEnv.executeSql("SELECT t1,t2 FROM MyUserTable");
 tableResult.print();
{quote}
 

Still not


was (Author: init):
{quote}StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
String createA = "CREATE TABLE MyUserTable (\n" +
 " t1 STRING,\n" +
 " t2 INT\n" +
 ") WITH (\n" +
 " 'connector.type' = 'kafka', \n" +
 " 'connector.version' = '0.11',\n" +
 " 'connector.topic' = 'csvtb', \n" +
 " 'connector.properties.bootstrap.servers' = 'localhost:9092', \n" +
 " 'connector.startup-mode' = 'earliest-offset', \n" +
 " 'format.type' = 'csv'\n" +
 ")\n";
bsTableEnv.executeSql(createA);
TableResult insert = bsTableEnv.executeSql("INSERT INTO MyUserTable VALUES('test111',2)");
insert.print();
Thread.sleep(10000);
TableResult tableResult = bsTableEnv.executeSql("SELECT t1,t2 FROM MyUserTable");
tableResult.print();
{quote}

> Kafka connector can't select data
> ---------------------------------
>
>                 Key: FLINK-18481
>                 URL: https://issues.apache.org/jira/browse/FLINK-18481
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.12.0, 1.11.1
>            Reporter: initsun
>            Priority: Major
>
> When I use flnk1.11-snapshot or 1.12-snapshot, I use flinksql and Kafka connector, such as
> “EnvironmentSettings fsSettings = EnvironmentSettings.newInstance ().useOldPlanner().inStreamingMode().build();
> StreamExecutionEnvironment fsEnv = Stream ExecutionEnvironment.getExecutionEnvironment ();
> StreamTableEnvironment tableEnv = St reamTableEnvironment.create (fsEnv, fsSettings);
> String createA = "CREATE TABLE MyUserTable (\n" +
> " t1 STRING,\n" +
> " t2 INT\n" +
> ") WITH (\n" +
> " ' connector.type ' = 'kafka', \n" +
> " ' connector.version ' = '0.11',\n" +
> " ' connector.topic ' = 'csvtb', \n" +
> " ' connector.properties.bootstrap .servers' = ' localhost:9092 ', \n" +
> " ' connector.startup -mode' = 'earliest-offset', \n" +
> " ' format.type ' = 'csv'\n" +
> ")\n";
> tableEnv.executeSql (createA);
> TableResult insert = tableEnv.executeSql ("INSERT INTO MyUserTable VALUES('test',2)");
> insert.print ();
> TableResult tableResult = tableEnv.executeSql ("SELECT t1,t2 FROM MyUserTable");
> tableResult.print ();”
> This code can insert data into Kafka, but it can't output the result. Why, thank you



--
This message was sent by Atlassian Jira
(v8.3.4#803005)