You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by admin <17...@163.com> on 2020/11/18 03:41:24 UTC

自定义分区提交策略之合并小文件的问题

Hi,
我们有这样的需求--流式入库后,可以自动添加分区和合并小文件。
参考了网上的自定义合并小文件的分区提交策略[1],经过测试发现。
这个自动以policy用于filesystem connector时可以正常合并文件,并生成目标文件。

由于自带的metastore policy只能用在hive table上,所以又测试了下使用hive catalog往hive table里写数据,经过测试 自动添加分区是ok的,但是合并小文件有点问题--没有合并后的目标目标。而且没有任何异常。

很奇怪的是同样的代码在写hdfs就正常,写hive不行,看了源码写hive底层也是依赖的StreamingFileSink,排查了两天没什么头绪,有没有大佬遇到过这个问题,或者有什么排查的思路。

policy 代码如下:
public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

    @Override
    public void commit(Context context) throws Exception {
        LOGGER.info("begin to merge files.partition path is {}.", context.partitionPath().toUri().toString());
        Configuration conf = new Configuration();
        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.partitionPath().toUri().getHost());
        FileSystem fs = FileSystem.get(conf);
        String partitionPath = context.partitionPath().getPath();


        List files = listAllFiles(fs, new Path(partitionPath), "part-");
        LOGGER.info("{} files in path {}", files.size(), partitionPath);//这里待合并文件数量可以正常打印


        MessageType schema = getParquetSchema(files, conf);
        if (schema == null) {
            return;
        }
        LOGGER.info("Fetched parquet schema: {}", schema.toString());//schema也正常输出


        Path result = merge(partitionPath, schema, files, fs);
        LOGGER.info("Files merged into {}", result.toString());
    }


    private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
        List result = new ArrayList<>();


        RemoteIterator dirIterator = fs.listFiles(dir, false);
        while (dirIterator.hasNext()) {
            LocatedFileStatus fileStatus = (LocatedFileStatus) dirIterator.next();
            Path filePath = fileStatus.getPath();
            if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
                result.add(filePath);
            }
        }


        return result;
    }


    private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
        if (files.size() == 0) {
            return null;
        }


        HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
        ParquetFileReader reader = ParquetFileReader.open(inputFile);
        ParquetMetadata metadata = reader.getFooter();
        MessageType schema = metadata.getFileMetaData().getSchema();


        reader.close();
        return schema;
    }


    private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
        Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");

        ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
                .withType(schema)
                .withConf(fs.getConf())
                .withWriteMode(Mode.OVERWRITE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build();
        

        for (Path file : files) {
            ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file)
                    .withConf(fs.getConf())
                    .build();
            Group data;
            while ((data = (Group) reader.read()) != null) {
                writer.write(data);
            }
            reader.close();
        }
        LOGGER.info("data size is [{}]", writer.getDataSize());//数据大小也正常输出

        try {
            writer.close();
        } catch (Exception e) {
            LOGGER.error("flush failed!!!!", e);//没有异常
        }

        if (!fs.exists(mergeDest)) {
            LOGGER.warn("Fuck! result file not exist.");
        }

        for (Path file : files) {
            fs.delete(file, false);
        }
        return mergeDest;
    }
}
粗略看了下ParquetWriter的源码,
ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
                .withType(schema)
                .withConf(fs.getConf())
                .withWriteMode(Mode.CREATE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build()
在最后build时会创建文件。所以说在这一步创建文件就没成功。
也shi过通过FileSystem.create 创建文件,可以创建但是write也不往里面写。

to hdfs代码:
CREATE TABLE test_kafka (
    tuid STRING,
    device STRING,
    active_time BIGINT,
    process_time BIGINT,
    pkg_cn_name STRING,
    pkg_en_name STRING,
    os STRING,
    appid INT,
    dtu STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'test_kafka',
    'properties.bootstrap.servers' = ‘xxx:9092',
    'properties.group.id' = 'test-1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'properties.flink.partition-discovery.interval-millis' = '300000'
);

CREATE TABLE test_hdfs (
    `day` STRING,
    `hour` STRING,
    tuid STRING,
    device STRING,
    active_time BIGINT,
    process_time BIGINT,
    pkg_cn_name STRING,
    pkg_en_name STRING,
    os STRING,
    appid INT,
    dtu STRING
) PARTITIONED BY (`day`, `hour`) WITH (
    'connector' = 'filesystem',
    'format' = 'parquet',
    'path' = 'hdfs://xxx/test.db/test_flink_sql',
    'parquet.compression'='SNAPPY',
    'sink.partition-commit.policy.kind' = 'success-file,custom',
    'sink.partition-commit.success-file.name' = '_SUCCESS',
    'sink.partition-commit.policy.class' = 'policy.ParquetFileMergingCommitPolicy'
);

insert into test_hdfs

select
    from_unixtime(process_time,'yyyy-MM-dd') as `day`,
    from_unixtime(process_time,'HH') as `hour`,
    tuid,
    device,
    active_time,
    process_time,
    pkg_cn_name,
    pkg_en_name,
    os,
    appid,
    dtu
from test_kafka;
To hive 代码:
public static void main(String[] args) {

        StreamExecutionEnvironment env = EnvUtil.OptionEOSEnv(60L);

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        tableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(60),
                Time.minutes(120));

        String name            = "myhive";
        String defaultDatabase = "test";
        String hiveConfDir     = "/opt/hive-conf"; // a local path

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");
        tableEnv.executeSql("DROP TABLE IF EXISTS test_kafka");
        tableEnv.executeSql("CREATE TABLE test_kafka (\n" +
                "    tuid STRING,\n" +
                "    device STRING,\n" +
                "    active_time BIGINT,\n" +
                "    process_time BIGINT,\n" +
                "    pkg_cn_name STRING,\n" +
                "    pkg_en_name STRING,\n" +
                "    os STRING,\n" +
                "    appid INT,\n" +
                "    dtu STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic' = 'test_kafka',\n" +
                "    'properties.bootstrap.servers' = 'xxx:9092',\n" +
                "    'properties.group.id' = 'test-2',\n" +
                "    'scan.startup.mode' = 'latest-offset',\n" +
                "    'format' = 'json',\n" +
                "    'properties.flink.partition-discovery.interval-millis' = '300000'\n" +
                ")");

        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.executeSql("DROP TABLE IF EXISTS test.sink_to_hive");
        tableEnv.executeSql("CREATE EXTERNAL TABLE test.sink_to_hive (\n" +
                "    tuid STRING,\n" +
                "    device STRING,\n" +
                "    active_time BIGINT,\n" +
                "    process_time BIGINT,\n" +
                "    pkg_cn_name STRING,\n" +
                "    pkg_en_name STRING,\n" +
                "    os STRING,\n" +
                "    appid INT,\n" +
                "    dtu STRING\n" +
                ") PARTITIONED BY (`day` STRING, `hour` STRING) STORED AS PARQUET\n" +
                "TBLPROPERTIES (\n" +
                "    'parquet.compression'='SNAPPY',\n" +
                "    'sink.partition-commit.policy.kind' = 'metastore,success-file,custom',\n" +
                "    'sink.partition-commit.success-file.name' = '_SUCCESS',\n" +
                "    'sink.partition-commit.policy.class' = 'policy.ParquetFileMergingCommitPolicy'\n" +
                ")");

        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tableEnv.executeSql("insert into test.sink_to_hive\n" +
                "select\n" +
                "    tuid,\n" +
                "    device,\n" +
                "    active_time,\n" +
                "    process_time,\n" +
                "    pkg_cn_name,\n" +
                "    pkg_en_name,\n" +
                "    os,\n" +
                "    appid,\n" +
                "    dtu,\n" +
                "    from_unixtime(process_time,'yyyy-MM-dd') as `day`,\n" +
                "    from_unixtime(process_time,'HH') as `hour`\n" +
                "from test_kafka");
    }


[1]https://developer.aliyun.com/article/770822