You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "799590989@qq.com.INVALID" <79...@qq.com.INVALID> on 2022/03/12 06:51:02 UTC
flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题
软件版本
flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2
通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,
运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。
测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。
String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini";
String keytab = "/home/tetris/conf/company.keytab";
String principal = "company";
System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);
Configuration configuration = new Configuration();
configuration.set("hadoop.security.authentication", "kerberos");
configuration.set("keytab.file", keytab);
configuration.setBoolean("hadoop.security.authorization", true);
configuration.set("kerberos.principal", principal);
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081);
StreamTableEnvironment flinkTableEnv = StreamTableEnvironment.create(env,bsSettings);
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris", "/home/tetris/conf", "1.1.1");
flinkTableEnv.registerCatalog("myhive",hiveCatalog);
flinkTableEnv.useCatalog("myhive");
flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print();
flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id STRING,status STRING) WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'person',\n" +
"'properties.bootstrap.servers' = '121.4.89.228:9092',\n" +
"'properties.group.id' = 'testGroup',\n" +
"'scan.startup.mode' = 'latest-offset',\n" +
"'format' = 'json',\n" +
"'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" +
")").print();
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print();
flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id STRING,status STRING)").print();
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
TableResult result = flinkTableEnv.executeSql("INSERT INTO output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928");
System.out.println(result.getJobClient().get().getJobID());
谌祥,杭州 - java后端开发 - 大数据方向
799590989@qq.com
Re: Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题
Posted by "799590989@qq.com.INVALID" <79...@qq.com.INVALID>.
测试了,确实是没有开启checkpoint的原因,刚刚在集群开启了checkpoint存储,代码里面开启了checkpoint的配置,hive中能新增kafka表的数据了。
感谢感谢!
谌祥,杭州 - java后端开发 - 大数据方向
799590989@qq.com
发件人: Caizhi Weng
发送时间: 2022-03-14 10:15
收件人: flink中文邮件组
主题: Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题
Hi!
流作业写入 hive 的数据需要在 checkpoint 之后才可见。我看你的代码里没有启用 checkpoint,可以试一下启用
checkpoint。
jdbc sink 是一个写入即可见的 sink,但它只能保证最终一致性。也就是说如果中途作业出现 failover,那么从上一次
checkpoint 到 failover 之间写入 jdbc sink 的数据就是“冗余”的,要被作业重启后的新数据覆盖才能回归到一致性。
filesystem sink 写入的时候应该创建的是一个临时文件,filesystem source 是不会读这个文件的,只有 checkpoint
之后才会把临时文件重命名。
799590989@qq.com.INVALID <79...@qq.com.invalid> 于2022年3月12日周六 14:51写道:
>
> 软件版本
> flink:1.13.6
> hive:1.1.1
> hadoop:2.6.0-cdh5.16.2
>
> 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,
>
> 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。
>
>
> 测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。
>
> String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini";
> String keytab = "/home/tetris/conf/company.keytab";
> String principal = "company";
> System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);
>
> Configuration configuration = new Configuration();
> configuration.set("hadoop.security.authentication", "kerberos");
> configuration.set("keytab.file", keytab);
> configuration.setBoolean("hadoop.security.authorization", true);
> configuration.set("kerberos.principal", principal);
> UserGroupInformation.setConfiguration(configuration);
> UserGroupInformation.loginUserFromKeytab(principal, keytab);
>
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081);
> StreamTableEnvironment flinkTableEnv =
> StreamTableEnvironment.create(env,bsSettings);
>
> HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris",
> "/home/tetris/conf", "1.1.1");
> flinkTableEnv.registerCatalog("myhive",hiveCatalog);
> flinkTableEnv.useCatalog("myhive");
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print();
> flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id
> STRING,status STRING) WITH (\n" +
> "'connector' = 'kafka',\n" +
> "'topic' = 'person',\n" +
> "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" +
> "'properties.group.id' = 'testGroup',\n" +
> "'scan.startup.mode' = 'latest-offset',\n" +
> "'format' = 'json',\n" +
>
> "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" +
> ")").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print();
> flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id
> STRING,status STRING)").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> TableResult result = flinkTableEnv.executeSql("INSERT INTO
> output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928");
> System.out.println(result.getJobClient().get().getJobID());
>
>
>
> 谌祥,杭州 - java后端开发 - 大数据方向
> 799590989@qq.com
>
Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题
Posted by Caizhi Weng <ts...@gmail.com>.
Hi!
流作业写入 hive 的数据需要在 checkpoint 之后才可见。我看你的代码里没有启用 checkpoint,可以试一下启用
checkpoint。
jdbc sink 是一个写入即可见的 sink,但它只能保证最终一致性。也就是说如果中途作业出现 failover,那么从上一次
checkpoint 到 failover 之间写入 jdbc sink 的数据就是“冗余”的,要被作业重启后的新数据覆盖才能回归到一致性。
filesystem sink 写入的时候应该创建的是一个临时文件,filesystem source 是不会读这个文件的,只有 checkpoint
之后才会把临时文件重命名。
799590989@qq.com.INVALID <79...@qq.com.invalid> 于2022年3月12日周六 14:51写道:
>
> 软件版本
> flink:1.13.6
> hive:1.1.1
> hadoop:2.6.0-cdh5.16.2
>
> 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,
>
> 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。
>
>
> 测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。
>
> String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini";
> String keytab = "/home/tetris/conf/company.keytab";
> String principal = "company";
> System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);
>
> Configuration configuration = new Configuration();
> configuration.set("hadoop.security.authentication", "kerberos");
> configuration.set("keytab.file", keytab);
> configuration.setBoolean("hadoop.security.authorization", true);
> configuration.set("kerberos.principal", principal);
> UserGroupInformation.setConfiguration(configuration);
> UserGroupInformation.loginUserFromKeytab(principal, keytab);
>
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081);
> StreamTableEnvironment flinkTableEnv =
> StreamTableEnvironment.create(env,bsSettings);
>
> HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris",
> "/home/tetris/conf", "1.1.1");
> flinkTableEnv.registerCatalog("myhive",hiveCatalog);
> flinkTableEnv.useCatalog("myhive");
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print();
> flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id
> STRING,status STRING) WITH (\n" +
> "'connector' = 'kafka',\n" +
> "'topic' = 'person',\n" +
> "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" +
> "'properties.group.id' = 'testGroup',\n" +
> "'scan.startup.mode' = 'latest-offset',\n" +
> "'format' = 'json',\n" +
>
> "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" +
> ")").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print();
> flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id
> STRING,status STRING)").print();
> flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> TableResult result = flinkTableEnv.executeSql("INSERT INTO
> output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928");
> System.out.println(result.getJobClient().get().getJobID());
>
>
>
> 谌祥,杭州 - java后端开发 - 大数据方向
> 799590989@qq.com
>