You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jun Zhang <zh...@gmail.com> on 2020/07/10 01:07:23 UTC

flink 1.11 使用sql写入hdfs无法自动提交分区

大家好:
 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢

我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
bsEnv.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

String sqlSource = "CREATE TABLE  source_kafka (\n" +
                  "    appName  STRING,\n" +
                  "    appVersion STRING,\n" +
                  "    uploadTime STRING\n" +
                  ") WITH (\n" +
                  "  'connector.type' = 'kafka',       \n" +
                  "  'connector.version' = '0.10',\n" +
                  "  'connector.topic' = 'mytest',\n" +
                  "  'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
                  "  'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
                  "  'connector.properties.group.id' = 'testGroup',\n" +
                  "  'format.type'='json',\n" +
                  "  'update-mode' = 'append' )";

tEnv.executeSql(sqlSource);

String sql = "CREATE TABLE fs_table (\n" +
            "    appName  STRING,\n" +
            "    appVersion STRING,\n" +
            "    uploadTime STRING,\n" +
            "  dt STRING," +
            "  h string" +
            ")  PARTITIONED BY (dt,h)  WITH (\n" +
            "  'connector'='filesystem',\n" +
                     "  'path'='hdfs://localhost/tmp/',\n" +
                     " 'sink.partition-commit.policy.kind' =
'success-file', " +
                     "  'format'='orc'\n" +
                     ")";
tEnv.executeSql(sql);

String insertSql = "insert into  fs_table SELECT appName
,appVersion,uploadTime, " +
                  " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

tEnv.executeSql(insertSql);

}

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jun Zhang <zh...@gmail.com>.
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。

Jun Zhang <zh...@gmail.com> 于2020年7月23日周四 上午11:34写道:

> hi,jinsong:
>
> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>
> Jun Zhang <zh...@gmail.com> 于2020年7月23日周四 上午11:15写道:
>
>> hi,夏帅:
>>
>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>
>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>
>> 夏帅 <jk...@dingtalk.com> 于2020年7月10日周五 下午5:39写道:
>>
>>> 你好,
>>> 我这边同样的代码,并没有出现类似的问题
>>> 是本地跑么,可以提供下日志信息么?
>>>
>>>

回复: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by "xuefli@outlook.com" <xu...@outlook.com>.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html

应该是这个原因
General

Important Note 1: When using Hadoop < 2.7, please use the OnCheckpointRollingPolicy which rolls part files on every checkpoint. The reason is that if part files “traverse” the checkpoint interval, then, upon recovery from a failure the StreamingFileSink may use the truncate() method of the filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions and Flink will throw an exception.


发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: Jun Zhang<ma...@gmail.com>
发送时间: 2020年7月23日 12:55
收件人: Jingsong Li<ma...@gmail.com>
抄送: user-zh<ma...@flink.apache.org>
主题: Re: flink 1.11 使用sql写入hdfs无法自动提交分区

hi,jinsong
我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。

Jingsong Li <ji...@gmail.com> 于2020年7月23日周四 上午11:45写道:

> 相同操作我也没有复现。。是可以成功执行的
>
> 你的HDFS是什么版本?是否可以考虑换个来测试下
>
> On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang <zh...@gmail.com>
> wrote:
>
>> hi,jinsong:
>>
>> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>>
>> Jun Zhang <zh...@gmail.com> 于2020年7月23日周四 上午11:15写道:
>>
>>> hi,夏帅:
>>>
>>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>>
>>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>>
>>> 夏帅 <jk...@dingtalk.com> 于2020年7月10日周五 下午5:39写道:
>>>
>>>> 你好,
>>>> 我这边同样的代码,并没有出现类似的问题
>>>> 是本地跑么,可以提供下日志信息么?
>>>>
>>>>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jun Zhang <zh...@gmail.com>.
hi,jinsong
我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。

Jingsong Li <ji...@gmail.com> 于2020年7月23日周四 上午11:45写道:

> 相同操作我也没有复现。。是可以成功执行的
>
> 你的HDFS是什么版本?是否可以考虑换个来测试下
>
> On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang <zh...@gmail.com>
> wrote:
>
>> hi,jinsong:
>>
>> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>>
>> Jun Zhang <zh...@gmail.com> 于2020年7月23日周四 上午11:15写道:
>>
>>> hi,夏帅:
>>>
>>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>>
>>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>>
>>> 夏帅 <jk...@dingtalk.com> 于2020年7月10日周五 下午5:39写道:
>>>
>>>> 你好,
>>>> 我这边同样的代码,并没有出现类似的问题
>>>> 是本地跑么,可以提供下日志信息么?
>>>>
>>>>
>
> --
> Best, Jingsong Lee
>

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jingsong Li <ji...@gmail.com>.
相同操作我也没有复现。。是可以成功执行的

你的HDFS是什么版本?是否可以考虑换个来测试下

On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang <zh...@gmail.com>
wrote:

> hi,jinsong:
>
> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>
> Jun Zhang <zh...@gmail.com> 于2020年7月23日周四 上午11:15写道:
>
>> hi,夏帅:
>>
>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>
>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>
>> 夏帅 <jk...@dingtalk.com> 于2020年7月10日周五 下午5:39写道:
>>
>>> 你好,
>>> 我这边同样的代码,并没有出现类似的问题
>>> 是本地跑么,可以提供下日志信息么?
>>>
>>>

-- 
Best, Jingsong Lee

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jun Zhang <zh...@gmail.com>.
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。

Jun Zhang <zh...@gmail.com> 于2020年7月23日周四 上午11:15写道:

> hi,夏帅:
>
> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>
> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>
> 夏帅 <jk...@dingtalk.com> 于2020年7月10日周五 下午5:39写道:
>
>> 你好,
>> 我这边同样的代码,并没有出现类似的问题
>> 是本地跑么,可以提供下日志信息么?
>>
>>

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jun Zhang <zh...@gmail.com>.
hi,夏帅:
抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。

你测试没有问题的情况并行度是 1 吗?写入hdfs?

夏帅 <jk...@dingtalk.com> 于2020年7月10日周五 下午5:39写道:

> 你好,
> 我这边同样的代码,并没有出现类似的问题
> 是本地跑么,可以提供下日志信息么?
>
>

回复:flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by 夏帅 <jk...@dingtalk.com.INVALID>.
你好,
我这边同样的代码,并没有出现类似的问题
是本地跑么,可以提供下日志信息么?


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jun Zhang <zh...@gmail.com>.
此外,如果设置并行度是大于1,虽然可以生成success文件,但是貌似不是第一次checkpoint结束的时候就生成了,我反复测试之后,好像是不固定的时间,比如可能是第5次,也可能是第10次checkpoint之后才生成的。

Jingsong Li <ji...@gmail.com> 于2020年7月10日周五 下午3:35写道:

> Hi, Jun,
>
> 非常感谢详细充分的测试~
>
> 接下来我复现排查下~
>
> Best,
> Jingsong
>
> On Fri, Jul 10, 2020 at 3:09 PM Jun Zhang <zh...@gmail.com>
> wrote:
>
> > hi,jinsong:
> >
> > 在我的测试环境下,对于我贴出来的那个代码。
> >
> >
> 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。
> > 2.如果设置程序的并行度是大于1,那么也会有success生成。
> > 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。
> >
> >
> >
> 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。
> >
> > Jingsong Li <ji...@gmail.com> 于2020年7月10日周五 下午2:54写道:
> >
> > > Hi,
> > >
> > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
> > > 我用同样SQL在我的环境是有的。
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <zh...@gmail.com>
> > > wrote:
> > >
> > > > 大家好:
> > > >  我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
> > > >
> > > >
> > > >
> > >
> >
> 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
> > > >
> > > > public static void main(String[] args) throws Exception{
> > > > StreamExecutionEnvironment bsEnv =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > bsEnv.enableCheckpointing(10000);
> > > > bsEnv.setParallelism(1);
> > > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> > > >
> > > > String sqlSource = "CREATE TABLE  source_kafka (\n" +
> > > >                   "    appName  STRING,\n" +
> > > >                   "    appVersion STRING,\n" +
> > > >                   "    uploadTime STRING\n" +
> > > >                   ") WITH (\n" +
> > > >                   "  'connector.type' = 'kafka',       \n" +
> > > >                   "  'connector.version' = '0.10',\n" +
> > > >                   "  'connector.topic' = 'mytest',\n" +
> > > >                   "  'connector.properties.zookeeper.connect' =
> > > > 'localhost:2181',\n" +
> > > >                   "  'connector.properties.bootstrap.servers' =
> > > > 'localhost:9092',\n" +
> > > >                   "  'connector.properties.group.id' =
> > 'testGroup',\n" +
> > > >                   "  'format.type'='json',\n" +
> > > >                   "  'update-mode' = 'append' )";
> > > >
> > > > tEnv.executeSql(sqlSource);
> > > >
> > > > String sql = "CREATE TABLE fs_table (\n" +
> > > >             "    appName  STRING,\n" +
> > > >             "    appVersion STRING,\n" +
> > > >             "    uploadTime STRING,\n" +
> > > >             "  dt STRING," +
> > > >             "  h string" +
> > > >             ")  PARTITIONED BY (dt,h)  WITH (\n" +
> > > >             "  'connector'='filesystem',\n" +
> > > >                      "  'path'='hdfs://localhost/tmp/',\n" +
> > > >                      " 'sink.partition-commit.policy.kind' =
> > > > 'success-file', " +
> > > >                      "  'format'='orc'\n" +
> > > >                      ")";
> > > > tEnv.executeSql(sql);
> > > >
> > > > String insertSql = "insert into  fs_table SELECT appName
> > > > ,appVersion,uploadTime, " +
> > > >                   " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
> > > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
> > > >
> > > > tEnv.executeSql(insertSql);
> > > >
> > > > }
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
>
> --
> Best, Jingsong Lee
>

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jingsong Li <ji...@gmail.com>.
Hi, Jun,

非常感谢详细充分的测试~

接下来我复现排查下~

Best,
Jingsong

On Fri, Jul 10, 2020 at 3:09 PM Jun Zhang <zh...@gmail.com>
wrote:

> hi,jinsong:
>
> 在我的测试环境下,对于我贴出来的那个代码。
>
> 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。
> 2.如果设置程序的并行度是大于1,那么也会有success生成。
> 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。
>
>
> 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。
>
> Jingsong Li <ji...@gmail.com> 于2020年7月10日周五 下午2:54写道:
>
> > Hi,
> >
> > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
> > 我用同样SQL在我的环境是有的。
> >
> > Best,
> > Jingsong
> >
> > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <zh...@gmail.com>
> > wrote:
> >
> > > 大家好:
> > >  我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
> > >
> > >
> > >
> >
> 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
> > >
> > > public static void main(String[] args) throws Exception{
> > > StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > bsEnv.enableCheckpointing(10000);
> > > bsEnv.setParallelism(1);
> > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> > >
> > > String sqlSource = "CREATE TABLE  source_kafka (\n" +
> > >                   "    appName  STRING,\n" +
> > >                   "    appVersion STRING,\n" +
> > >                   "    uploadTime STRING\n" +
> > >                   ") WITH (\n" +
> > >                   "  'connector.type' = 'kafka',       \n" +
> > >                   "  'connector.version' = '0.10',\n" +
> > >                   "  'connector.topic' = 'mytest',\n" +
> > >                   "  'connector.properties.zookeeper.connect' =
> > > 'localhost:2181',\n" +
> > >                   "  'connector.properties.bootstrap.servers' =
> > > 'localhost:9092',\n" +
> > >                   "  'connector.properties.group.id' =
> 'testGroup',\n" +
> > >                   "  'format.type'='json',\n" +
> > >                   "  'update-mode' = 'append' )";
> > >
> > > tEnv.executeSql(sqlSource);
> > >
> > > String sql = "CREATE TABLE fs_table (\n" +
> > >             "    appName  STRING,\n" +
> > >             "    appVersion STRING,\n" +
> > >             "    uploadTime STRING,\n" +
> > >             "  dt STRING," +
> > >             "  h string" +
> > >             ")  PARTITIONED BY (dt,h)  WITH (\n" +
> > >             "  'connector'='filesystem',\n" +
> > >                      "  'path'='hdfs://localhost/tmp/',\n" +
> > >                      " 'sink.partition-commit.policy.kind' =
> > > 'success-file', " +
> > >                      "  'format'='orc'\n" +
> > >                      ")";
> > > tEnv.executeSql(sql);
> > >
> > > String insertSql = "insert into  fs_table SELECT appName
> > > ,appVersion,uploadTime, " +
> > >                   " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
> > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
> > >
> > > tEnv.executeSql(insertSql);
> > >
> > > }
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jun Zhang <zh...@gmail.com>.
hi,jinsong:

在我的测试环境下,对于我贴出来的那个代码。
1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。
2.如果设置程序的并行度是大于1,那么也会有success生成。
3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。

综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。

Jingsong Li <ji...@gmail.com> 于2020年7月10日周五 下午2:54写道:

> Hi,
>
> 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
> 我用同样SQL在我的环境是有的。
>
> Best,
> Jingsong
>
> On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <zh...@gmail.com>
> wrote:
>
> > 大家好:
> >  我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
> >
> >
> >
> 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
> >
> > public static void main(String[] args) throws Exception{
> > StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > bsEnv.enableCheckpointing(10000);
> > bsEnv.setParallelism(1);
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> >
> > String sqlSource = "CREATE TABLE  source_kafka (\n" +
> >                   "    appName  STRING,\n" +
> >                   "    appVersion STRING,\n" +
> >                   "    uploadTime STRING\n" +
> >                   ") WITH (\n" +
> >                   "  'connector.type' = 'kafka',       \n" +
> >                   "  'connector.version' = '0.10',\n" +
> >                   "  'connector.topic' = 'mytest',\n" +
> >                   "  'connector.properties.zookeeper.connect' =
> > 'localhost:2181',\n" +
> >                   "  'connector.properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> >                   "  'connector.properties.group.id' = 'testGroup',\n" +
> >                   "  'format.type'='json',\n" +
> >                   "  'update-mode' = 'append' )";
> >
> > tEnv.executeSql(sqlSource);
> >
> > String sql = "CREATE TABLE fs_table (\n" +
> >             "    appName  STRING,\n" +
> >             "    appVersion STRING,\n" +
> >             "    uploadTime STRING,\n" +
> >             "  dt STRING," +
> >             "  h string" +
> >             ")  PARTITIONED BY (dt,h)  WITH (\n" +
> >             "  'connector'='filesystem',\n" +
> >                      "  'path'='hdfs://localhost/tmp/',\n" +
> >                      " 'sink.partition-commit.policy.kind' =
> > 'success-file', " +
> >                      "  'format'='orc'\n" +
> >                      ")";
> > tEnv.executeSql(sql);
> >
> > String insertSql = "insert into  fs_table SELECT appName
> > ,appVersion,uploadTime, " +
> >                   " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
> > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
> >
> > tEnv.executeSql(insertSql);
> >
> > }
> >
>
>
> --
> Best, Jingsong Lee
>

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
我用同样SQL在我的环境是有的。

Best,
Jingsong

On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang <zh...@gmail.com>
wrote:

> 大家好:
>  我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
>
>
> 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
>
> public static void main(String[] args) throws Exception{
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.enableCheckpointing(10000);
> bsEnv.setParallelism(1);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
>
> String sqlSource = "CREATE TABLE  source_kafka (\n" +
>                   "    appName  STRING,\n" +
>                   "    appVersion STRING,\n" +
>                   "    uploadTime STRING\n" +
>                   ") WITH (\n" +
>                   "  'connector.type' = 'kafka',       \n" +
>                   "  'connector.version' = '0.10',\n" +
>                   "  'connector.topic' = 'mytest',\n" +
>                   "  'connector.properties.zookeeper.connect' =
> 'localhost:2181',\n" +
>                   "  'connector.properties.bootstrap.servers' =
> 'localhost:9092',\n" +
>                   "  'connector.properties.group.id' = 'testGroup',\n" +
>                   "  'format.type'='json',\n" +
>                   "  'update-mode' = 'append' )";
>
> tEnv.executeSql(sqlSource);
>
> String sql = "CREATE TABLE fs_table (\n" +
>             "    appName  STRING,\n" +
>             "    appVersion STRING,\n" +
>             "    uploadTime STRING,\n" +
>             "  dt STRING," +
>             "  h string" +
>             ")  PARTITIONED BY (dt,h)  WITH (\n" +
>             "  'connector'='filesystem',\n" +
>                      "  'path'='hdfs://localhost/tmp/',\n" +
>                      " 'sink.partition-commit.policy.kind' =
> 'success-file', " +
>                      "  'format'='orc'\n" +
>                      ")";
> tEnv.executeSql(sql);
>
> String insertSql = "insert into  fs_table SELECT appName
> ,appVersion,uploadTime, " +
>                   " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'),
> DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
>
> tEnv.executeSql(insertSql);
>
> }
>


-- 
Best, Jingsong Lee