You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "yantao (Jira)" <ji...@apache.org> on 2022/06/08 04:06:00 UTC
[jira] [Created] (FLINK-27950) flink streaming写入hive表,设置的文件后缀参数失效
yantao created FLINK-27950:
------------------------------
Summary: flink streaming写入hive表,设置的文件后缀参数失效
Key: FLINK-27950
URL: https://issues.apache.org/jira/browse/FLINK-27950
Project: Flink
Issue Type: Bug
Components: Connectors / Hive
Affects Versions: 1.13.2
Environment: flink:flink-1.13.2
hive: 1.1.0-cdh5.14.0
Reporter: yantao
当我用flink streaming 写入hive时,设置了hive参数如下:
{code:java}
TBLPROPERTIES ('hive.output.file.extension'='.parquet') {code}
我查看hdfs文件时,并没有找到.parquet后缀的文件,但我在hive中执行sql写入时是正常触发该参数的。是我使用不正确还是有其他原因呢?
以下是我的demo:
{code:java}
val tableEnvSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, tableEnvSettings)
val catalog = new HiveCatalog("myHive", "xx", "/usr/local/xx/conf")
tEnv.registerCatalog("myHive", catalog)
tEnv.useCatalog("myHive")
tEnv.useDatabase("xx")
// 若使用方言 则需要把flink-sql-connector-hive包 放到flink/lib 路径下
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val createTSql: String =
s"""
|create table if not exists $hiveTable (
| ...
|)
|...
|TBLPROPERTIES (
| 'sink.parallelism'='1',
| 'partition.time-extractor.timestamp-pattern'='$$dt',
| 'sink.shuffle-by-partition.enable'='true',
| 'sink.partition-commit.policy.kind'='metastore,success-file',
| 'hive.output.file.extension'='.parquet'
|)
|""".stripMargin
tEnv.executeSql(createTSql)
tEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tEnv.createTemporaryView("t_xx", DataStream)
val insertSql: String =
s"""
| insert into $hiveTable
| select *
| from t_xx
|""".stripMargin
tEnv.executeSql(insertSql)
tEnv.dropTemporaryView("t_userAction") {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)