You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by admin <17...@163.com> on 2020/11/18 03:41:24 UTC
自定义分区提交策略之合并小文件的问题
Hi,
我们有这样的需求--流式入库后,可以自动添加分区和合并小文件。
参考了网上的自定义合并小文件的分区提交策略[1],经过测试发现。
这个自动以policy用于filesystem connector时可以正常合并文件,并生成目标文件。
由于自带的metastore policy只能用在hive table上,所以又测试了下使用hive catalog往hive table里写数据,经过测试 自动添加分区是ok的,但是合并小文件有点问题--没有合并后的目标目标。而且没有任何异常。
很奇怪的是同样的代码在写hdfs就正常,写hive不行,看了源码写hive底层也是依赖的StreamingFileSink,排查了两天没什么头绪,有没有大佬遇到过这个问题,或者有什么排查的思路。
policy 代码如下:
public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);
@Override
public void commit(Context context) throws Exception {
LOGGER.info("begin to merge files.partition path is {}.", context.partitionPath().toUri().toString());
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.partitionPath().toUri().getHost());
FileSystem fs = FileSystem.get(conf);
String partitionPath = context.partitionPath().getPath();
List files = listAllFiles(fs, new Path(partitionPath), "part-");
LOGGER.info("{} files in path {}", files.size(), partitionPath);//这里待合并文件数量可以正常打印
MessageType schema = getParquetSchema(files, conf);
if (schema == null) {
return;
}
LOGGER.info("Fetched parquet schema: {}", schema.toString());//schema也正常输出
Path result = merge(partitionPath, schema, files, fs);
LOGGER.info("Files merged into {}", result.toString());
}
private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
List result = new ArrayList<>();
RemoteIterator dirIterator = fs.listFiles(dir, false);
while (dirIterator.hasNext()) {
LocatedFileStatus fileStatus = (LocatedFileStatus) dirIterator.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
result.add(filePath);
}
}
return result;
}
private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
if (files.size() == 0) {
return null;
}
HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
ParquetFileReader reader = ParquetFileReader.open(inputFile);
ParquetMetadata metadata = reader.getFooter();
MessageType schema = metadata.getFileMetaData().getSchema();
reader.close();
return schema;
}
private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.OVERWRITE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
for (Path file : files) {
ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(fs.getConf())
.build();
Group data;
while ((data = (Group) reader.read()) != null) {
writer.write(data);
}
reader.close();
}
LOGGER.info("data size is [{}]", writer.getDataSize());//数据大小也正常输出
try {
writer.close();
} catch (Exception e) {
LOGGER.error("flush failed!!!!", e);//没有异常
}
if (!fs.exists(mergeDest)) {
LOGGER.warn("Fuck! result file not exist.");
}
for (Path file : files) {
fs.delete(file, false);
}
return mergeDest;
}
}
粗略看了下ParquetWriter的源码,
ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
在最后build时会创建文件。所以说在这一步创建文件就没成功。
也shi过通过FileSystem.create 创建文件,可以创建但是write也不往里面写。
to hdfs代码:
CREATE TABLE test_kafka (
tuid STRING,
device STRING,
active_time BIGINT,
process_time BIGINT,
pkg_cn_name STRING,
pkg_en_name STRING,
os STRING,
appid INT,
dtu STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test_kafka',
'properties.bootstrap.servers' = ‘xxx:9092',
'properties.group.id' = 'test-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '300000'
);
CREATE TABLE test_hdfs (
`day` STRING,
`hour` STRING,
tuid STRING,
device STRING,
active_time BIGINT,
process_time BIGINT,
pkg_cn_name STRING,
pkg_en_name STRING,
os STRING,
appid INT,
dtu STRING
) PARTITIONED BY (`day`, `hour`) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'path' = 'hdfs://xxx/test.db/test_flink_sql',
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file,custom',
'sink.partition-commit.success-file.name' = '_SUCCESS',
'sink.partition-commit.policy.class' = 'policy.ParquetFileMergingCommitPolicy'
);
insert into test_hdfs
select
from_unixtime(process_time,'yyyy-MM-dd') as `day`,
from_unixtime(process_time,'HH') as `hour`,
tuid,
device,
active_time,
process_time,
pkg_cn_name,
pkg_en_name,
os,
appid,
dtu
from test_kafka;
To hive 代码:
public static void main(String[] args) {
StreamExecutionEnvironment env = EnvUtil.OptionEOSEnv(60L);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(60),
Time.minutes(120));
String name = "myhive";
String defaultDatabase = "test";
String hiveConfDir = "/opt/hive-conf"; // a local path
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
tableEnv.executeSql("DROP TABLE IF EXISTS test_kafka");
tableEnv.executeSql("CREATE TABLE test_kafka (\n" +
" tuid STRING,\n" +
" device STRING,\n" +
" active_time BIGINT,\n" +
" process_time BIGINT,\n" +
" pkg_cn_name STRING,\n" +
" pkg_en_name STRING,\n" +
" os STRING,\n" +
" appid INT,\n" +
" dtu STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'test_kafka',\n" +
" 'properties.bootstrap.servers' = 'xxx:9092',\n" +
" 'properties.group.id' = 'test-2',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json',\n" +
" 'properties.flink.partition-discovery.interval-millis' = '300000'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS test.sink_to_hive");
tableEnv.executeSql("CREATE EXTERNAL TABLE test.sink_to_hive (\n" +
" tuid STRING,\n" +
" device STRING,\n" +
" active_time BIGINT,\n" +
" process_time BIGINT,\n" +
" pkg_cn_name STRING,\n" +
" pkg_en_name STRING,\n" +
" os STRING,\n" +
" appid INT,\n" +
" dtu STRING\n" +
") PARTITIONED BY (`day` STRING, `hour` STRING) STORED AS PARQUET\n" +
"TBLPROPERTIES (\n" +
" 'parquet.compression'='SNAPPY',\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file,custom',\n" +
" 'sink.partition-commit.success-file.name' = '_SUCCESS',\n" +
" 'sink.partition-commit.policy.class' = 'policy.ParquetFileMergingCommitPolicy'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("insert into test.sink_to_hive\n" +
"select\n" +
" tuid,\n" +
" device,\n" +
" active_time,\n" +
" process_time,\n" +
" pkg_cn_name,\n" +
" pkg_en_name,\n" +
" os,\n" +
" appid,\n" +
" dtu,\n" +
" from_unixtime(process_time,'yyyy-MM-dd') as `day`,\n" +
" from_unixtime(process_time,'HH') as `hour`\n" +
"from test_kafka");
}
[1]https://developer.aliyun.com/article/770822