You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jun Zhang (Jira)" <ji...@apache.org> on 2020/07/10 03:24:00 UTC

[jira] [Updated] (FLINK-18549) flink 1.11 can not commit partition automatically

     [ https://issues.apache.org/jira/browse/FLINK-18549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Zhang updated FLINK-18549:
------------------------------
    Description: 
I use the sql of flink 1.11, read from kafka and writing to hdfs, I found that the partition cannot be submitted automatically. This is my complete code。

My checkpoint interval is 10s. I think it should be normal that there will be _SUCCESS file under the partition of hdfs every 10s, but in fact there is no

 
{code:java}
       StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      bsEnv.enableCheckpointing(10000);
      bsEnv.setParallelism(1);
      StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

      String sqlSource = "CREATE TABLE  source_kafka (\n" +
                         "    appName  STRING,\n" +
                         "    appVersion STRING,\n" +
                         "    uploadTime STRING\n" +
                         ") WITH (\n" +
                         "  'connector.type' = 'kafka',       \n" +
                         "  'connector.version' = '0.10',\n" +
                         "  'connector.topic' = 'test_topic',\n" +
                         "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                         "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
                         "  'connector.properties.group.id' = 'testGroup',\n" +
                         "  'format.type'='json',\n" +
                         "  'update-mode' = 'append' )";

      tEnv.executeSql(sqlSource);


      String sql = "CREATE TABLE fs_table (\n" +
                   "    appName  STRING,\n" +
                   "    appVersion STRING,\n" +
                   "    uploadTime STRING,\n" +
                   "  dt STRING," +
                   "  h string" +
                   ")  PARTITIONED BY (dt,h)  WITH (\n" +
                   "  'connector'='filesystem',\n" +
                     "  'path'='hdfs://localhost/tmp/',\n" +
                     " 'sink.partition-commit.policy.kind' = 'success-file', " +
                     "  'format'='orc'\n" +
                     ")";
      tEnv.executeSql(sql);

      String insertSql = "insert into  fs_table SELECT appName ,appVersion,uploadTime, " +
                         " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

      tEnv.executeSql(insertSql);
{code}

  was:
I use the sql of flink 1.11, read from kafka and writing to hdfs, I found that the partition cannot be submitted automatically. This is my complete code。

My checkpoint interval is 10s. I think it should be normal that there will be _SUCCESS file under the partition of hdfs every 10s, but in fact there is no

 
{code:java}
       StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      bsEnv.enableCheckpointing(10000);
      bsEnv.setParallelism(1);
      StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

      String sqlSource = "CREATE TABLE  source_kafka (\n" +
                         "    appName  STRING,\n" +
                         "    appVersion STRING,\n" +
                         "    uploadTime STRING\n" +
                         ") WITH (\n" +
                         "  'connector.type' = 'kafka',       \n" +
                         "  'connector.version' = '0.10',\n" +
                         "  'connector.topic' = 'test_topic',\n" +
                         "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                         "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
                         "  'connector.properties.group.id' = 'testGroup',\n" +
                         "  'format.type'='json',\n" +
                         "  'update-mode' = 'append' )";

      tEnv.executeSql(sqlSource);


      String sql = "CREATE TABLE fs_table (\n" +
                   "    appName  STRING,\n" +
                   "    appVersion STRING,\n" +
                   "    uploadTime STRING,\n" +
                   "  dt STRING," +
                   "  h string" +
                   ")  PARTITIONED BY (dt,h)  WITH (\n" +
                   "  'connector'='filesystem',\n" +
//                     "  'path'='hdfs://localhost/tmp/',\n" +
                     " 'sink.partition-commit.policy.kind' = 'success-file', " +
                     "  'format'='orc'\n" +
                     ")";
      tEnv.executeSql(sql);

      String insertSql = "insert into  fs_table SELECT appName ,appVersion,uploadTime, " +
                         " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

      tEnv.executeSql(insertSql);
{code}


> flink 1.11 can not commit partition automatically
> -------------------------------------------------
>
>                 Key: FLINK-18549
>                 URL: https://issues.apache.org/jira/browse/FLINK-18549
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.0
>            Reporter: Jun Zhang
>            Priority: Major
>             Fix For: 1.11.1
>
>
> I use the sql of flink 1.11, read from kafka and writing to hdfs, I found that the partition cannot be submitted automatically. This is my complete code。
> My checkpoint interval is 10s. I think it should be normal that there will be _SUCCESS file under the partition of hdfs every 10s, but in fact there is no
>  
> {code:java}
>        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>       bsEnv.enableCheckpointing(10000);
>       bsEnv.setParallelism(1);
>       StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
>       String sqlSource = "CREATE TABLE  source_kafka (\n" +
>                          "    appName  STRING,\n" +
>                          "    appVersion STRING,\n" +
>                          "    uploadTime STRING\n" +
>                          ") WITH (\n" +
>                          "  'connector.type' = 'kafka',       \n" +
>                          "  'connector.version' = '0.10',\n" +
>                          "  'connector.topic' = 'test_topic',\n" +
>                          "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
>                          "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
>                          "  'connector.properties.group.id' = 'testGroup',\n" +
>                          "  'format.type'='json',\n" +
>                          "  'update-mode' = 'append' )";
>       tEnv.executeSql(sqlSource);
>       String sql = "CREATE TABLE fs_table (\n" +
>                    "    appName  STRING,\n" +
>                    "    appVersion STRING,\n" +
>                    "    uploadTime STRING,\n" +
>                    "  dt STRING," +
>                    "  h string" +
>                    ")  PARTITIONED BY (dt,h)  WITH (\n" +
>                    "  'connector'='filesystem',\n" +
>                      "  'path'='hdfs://localhost/tmp/',\n" +
>                      " 'sink.partition-commit.policy.kind' = 'success-file', " +
>                      "  'format'='orc'\n" +
>                      ")";
>       tEnv.executeSql(sql);
>       String insertSql = "insert into  fs_table SELECT appName ,appVersion,uploadTime, " +
>                          " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
>       tEnv.executeSql(insertSql);
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)