You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jun Zhang (Jira)" <ji...@apache.org> on 2020/07/10 03:23:00 UTC
[jira] [Created] (FLINK-18549) flink 1.11 can not commit partition
automatically
Jun Zhang created FLINK-18549:
---------------------------------
Summary: flink 1.11 can not commit partition automatically
Key: FLINK-18549
URL: https://issues.apache.org/jira/browse/FLINK-18549
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.11.0
Reporter: Jun Zhang
Fix For: 1.11.1
I use the sql of flink 1.11, read from kafka and writing to hdfs, I found that the partition cannot be submitted automatically. This is my complete code。
My checkpoint interval is 10s. I think it should be normal that there will be _SUCCESS file under the partition of hdfs every 10s, but in fact there is no
{code:java}
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
bsEnv.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
String sqlSource = "CREATE TABLE source_kafka (\n" +
" appName STRING,\n" +
" appVersion STRING,\n" +
" uploadTime STRING\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', \n" +
" 'connector.version' = '0.10',\n" +
" 'connector.topic' = 'test_topic',\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'connector.properties.group.id' = 'testGroup',\n" +
" 'format.type'='json',\n" +
" 'update-mode' = 'append' )";
tEnv.executeSql(sqlSource);
String sql = "CREATE TABLE fs_table (\n" +
" appName STRING,\n" +
" appVersion STRING,\n" +
" uploadTime STRING,\n" +
" dt STRING," +
" h string" +
") PARTITIONED BY (dt,h) WITH (\n" +
" 'connector'='filesystem',\n" +
// " 'path'='hdfs://localhost/tmp/',\n" +
" 'sink.partition-commit.policy.kind' = 'success-file', " +
" 'format'='orc'\n" +
")";
tEnv.executeSql(sql);
String insertSql = "insert into fs_table SELECT appName ,appVersion,uploadTime, " +
" DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
tEnv.executeSql(insertSql);
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)