You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "颖 (Jira)" <ji...@apache.org> on 2020/05/06 12:10:00 UTC
[jira] [Resolved] (FLINK-17438) Flink StreamingFileSink chinese
garbled
[ https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
颖 resolved FLINK-17438.
-----------------------
Resolution: Fixed
> Flink StreamingFileSink chinese garbled
> ---------------------------------------
>
> Key: FLINK-17438
> URL: https://issues.apache.org/jira/browse/FLINK-17438
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.10.0
> Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0
> Reporter: 颖
> Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new CompressWriterFactory[String](new DefaultExtractor[String]())
> .withHadoopCompression(s"SnappyCodec")//${compress}
> val fileConfig = OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
> val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), writer)
> // 自定义分桶策略
> bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
> // 自定义输出文件配置
> bulkFormatBuilder.withOutputFileConfig(fileConfig)
> val sink = bulkFormatBuilder.build()
> // val rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1 * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
> ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to hdfs")
>
> In this way, flink API is called and written to HDFS. There are Chinese fields in the log, and the corresponding scrambled code is after hive is resolved,
> CREATE EXTERNAL TABLE `demo_app`(
> `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY (
> `ymd` string COMMENT '日期分区yyyymmdd')
> ROW FORMAT SERDE
> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.mapred.TextInputFormat'
> OUTPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
> 'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"������","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)