You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 陈卓宇 <25...@qq.com.INVALID> on 2021/11/11 08:50:19 UTC

flinksql sink写csv小文件问题

问题描述:写到本地产生了8个part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 ...的文件
请问,我只想写成一个csv文件,如果关闭这种文件分区



Flink SQL:
String tw_smart_tag="CREATE TABLE tw_smart_tag (\n" +
        "  id STRING,\n" +
        "  tag_code STRING,\n"+
        "  parent_id STRING,\n"+
        "  name STRING,\n"+
        "  type STRING,\n"+
        "  tag_type STRING,\n"+
        "  data_type STRING,\n"+
        "  status STRING,\n"+
        "  valid_status STRING,\n"+
        "  opr_status STRING,\n"+
        "  online STRING,\n"+
        "  opr_type STRING,\n"+
        "  opr_time STRING,\n"+
        "  invalid_time STRING,\n"+
        "  auth_type STRING,\n"+
        "  remark STRING,\n"+
        "  sort STRING,\n"+
        "  batch_no STRING,\n"+
        "  created_by STRING,\n"+
        "  created_time STRING,\n"+
        "  updated_by STRING,\n"+
        "  updated_time STRING\n"+
        ") WITH (\n" +
        "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
        "  'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag.csv',  -- 必选: 指向目录的路径\n" +
        "  'format' = 'csv'                   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 部分以获取更多细节\n" +
        ")\n";

String tw_smart_tag_detail="CREATE TABLE tw_smart_tag_detail (\n" +
        "  id STRING,\n" +
        "  tag_id STRING,\n"+
        "  code STRING,\n"+
        "  name STRING,\n"+
        "  content STRING,\n"+
        "  status STRING,\n"+
        "  created_by STRING,\n"+
        "  created_time STRING,\n"+
        "  updated_by STRING,\n"+
        "  updated_time STRING\n"+
        ") WITH (\n" +
        "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
        "  'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag_detail.csv',  -- 必选: 指向目录的路径\n" +
        "  'format' = 'csv'                   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 部分以获取更多细节\n" +
        ")\n";

//导出到本地
String loaclhostFile="CREATE TABLE loaclhost_File (\n" +
        "  id STRING,\n" +
        "  tag_code STRING,\n"+
        "  name STRING,\n"+
        "  data_type STRING,\n"+
        "  detailID STRING,\n"+
        "  tag_id STRING,\n"+
        "  detailName STRING\n"+
        ") WITH (\n" +
        "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
        "  'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/data/',  -- 必选: 指向目录的路径\n" +
        "  'format' = 'csv',                   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 部分以获取更多细节\n" +
        ")\n";

String joinSQL = "insert into loaclhost_File\n" +
        "SELECT tw_smart_tag.id AS id,\n" +
        "tw_smart_tag.tag_code AS tag_code,\n" +
        "tw_smart_tag.name AS name,\n" +
        "tw_smart_tag.data_type AS data_type,\n" +
        "tw_smart_tag_detail.id AS detailID,\n" +
        "tw_smart_tag_detail.tag_id AS tag_id,\n" +
        "tw_smart_tag_detail.name AS detailName\n" +
                "FROM tw_smart_tag INNER JOIN tw_smart_tag_detail ON tw_smart_tag.id = tw_smart_tag_detail.tag_id";
// tw_smart_tag.id = tw_smart_tag_detail.tag_id
tenv.executeSql(tw_smart_tag).print();
tenv.executeSql(tw_smart_tag_detail).print();
tenv.executeSql(loaclhostFile).print();
tenv.executeSql(joinSQL).print();

陈卓宇


&nbsp;

Re: flinksql sink写csv小文件问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

filesystem sink 的文件数量与 sink 并发数有关。如果数据量不大可以考虑在 sink DDL 的 with 参数里加入
'sink.parallelism' = '1' 设置 sink 并发度为 1。

陈卓宇 <25...@qq.com.invalid> 于2021年11月11日周四 下午4:50写道:

> 问题描述:写到本地产生了8个part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 ...的文件
> 请问,我只想写成一个csv文件,如果关闭这种文件分区
>
>
>
> Flink SQL:
> String tw_smart_tag="CREATE TABLE tw_smart_tag (\n" +
>         "  id STRING,\n" +
>         "  tag_code STRING,\n"+
>         "  parent_id STRING,\n"+
>         "  name STRING,\n"+
>         "  type STRING,\n"+
>         "  tag_type STRING,\n"+
>         "  data_type STRING,\n"+
>         "  status STRING,\n"+
>         "  valid_status STRING,\n"+
>         "  opr_status STRING,\n"+
>         "  online STRING,\n"+
>         "  opr_type STRING,\n"+
>         "  opr_time STRING,\n"+
>         "  invalid_time STRING,\n"+
>         "  auth_type STRING,\n"+
>         "  remark STRING,\n"+
>         "  sort STRING,\n"+
>         "  batch_no STRING,\n"+
>         "  created_by STRING,\n"+
>         "  created_time STRING,\n"+
>         "  updated_by STRING,\n"+
>         "  updated_time STRING\n"+
>         ") WITH (\n" +
>         "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
>         "  'path' =
> 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag.csv',
> -- 必选: 指向目录的路径\n" +
>         "  'format' = 'csv'                   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式
> 部分以获取更多细节\n" +
>         ")\n";
>
> String tw_smart_tag_detail="CREATE TABLE tw_smart_tag_detail (\n" +
>         "  id STRING,\n" +
>         "  tag_id STRING,\n"+
>         "  code STRING,\n"+
>         "  name STRING,\n"+
>         "  content STRING,\n"+
>         "  status STRING,\n"+
>         "  created_by STRING,\n"+
>         "  created_time STRING,\n"+
>         "  updated_by STRING,\n"+
>         "  updated_time STRING\n"+
>         ") WITH (\n" +
>         "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
>         "  'path' =
> 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag_detail.csv',
> -- 必选: 指向目录的路径\n" +
>         "  'format' = 'csv'                   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式
> 部分以获取更多细节\n" +
>         ")\n";
>
> //导出到本地
> String loaclhostFile="CREATE TABLE loaclhost_File (\n" +
>         "  id STRING,\n" +
>         "  tag_code STRING,\n"+
>         "  name STRING,\n"+
>         "  data_type STRING,\n"+
>         "  detailID STRING,\n"+
>         "  tag_id STRING,\n"+
>         "  detailName STRING\n"+
>         ") WITH (\n" +
>         "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
>         "  'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/data/',  --
> 必选: 指向目录的路径\n" +
>         "  'format' = 'csv',                   -- 必选: 文件系统连接器需要指定格式,请查阅
> 表格式 部分以获取更多细节\n" +
>         ")\n";
>
> String joinSQL = "insert into loaclhost_File\n" +
>         "SELECT tw_smart_tag.id AS id,\n" +
>         "tw_smart_tag.tag_code AS tag_code,\n" +
>         "tw_smart_tag.name AS name,\n" +
>         "tw_smart_tag.data_type AS data_type,\n" +
>         "tw_smart_tag_detail.id AS detailID,\n" +
>         "tw_smart_tag_detail.tag_id AS tag_id,\n" +
>         "tw_smart_tag_detail.name AS detailName\n" +
>                 "FROM tw_smart_tag INNER JOIN tw_smart_tag_detail ON
> tw_smart_tag.id = tw_smart_tag_detail.tag_id";
> // tw_smart_tag.id = tw_smart_tag_detail.tag_id
> tenv.executeSql(tw_smart_tag).print();
> tenv.executeSql(tw_smart_tag_detail).print();
> tenv.executeSql(loaclhostFile).print();
> tenv.executeSql(joinSQL).print();
>
> 陈卓宇
>
>
> &nbsp;