You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "颖 (Jira)" <ji...@apache.org> on 2020/05/06 12:10:00 UTC

[jira] [Resolved] (FLINK-17438) Flink StreamingFileSink chinese garbled

     [ https://issues.apache.org/jira/browse/FLINK-17438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

颖 resolved FLINK-17438.
-----------------------
    Resolution: Fixed

> Flink StreamingFileSink chinese garbled
> ---------------------------------------
>
>                 Key: FLINK-17438
>                 URL: https://issues.apache.org/jira/browse/FLINK-17438
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.10.0
>         Environment: CDH6.0.1 hadoop3.0.0 Flink 1.10.0 
>            Reporter: 颖
>            Priority: Blocker
>
> val writer:CompressWriterFactory[String] = new CompressWriterFactory[String](new DefaultExtractor[String]())
>  .withHadoopCompression(s"SnappyCodec")//${compress}
>  val fileConfig = OutputFileConfig.builder().withPartPrefix(s"${prefix}").withPartSuffix(s"${suffix}").build()
>  val bulkFormatBuilder = StreamingFileSink.forBulkFormat(new Path(output), writer)
>  // 自定义分桶策略
>  bulkFormatBuilder.withBucketAssigner(new DemoAssigner())
>  // 自定义输出文件配置
>  bulkFormatBuilder.withOutputFileConfig(fileConfig)
>  val sink = bulkFormatBuilder.build()
> // val rollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(5)).withInactivityInterval(TimeUnit.MINUTES.toMillis(3)).withMaxPartSize(1 * 1024 * 1024)
> // val bulkFormatBuilder = StreamingFileSink.forRowFormat(new Path(output), new SimpleStringEncoder[String]()).withRollingPolicy(rollingPolicy.build())
> // val sink = bulkFormatBuilder.build()
>  ds.map(_.log).addSink(sink).setParallelism(fileNum).name("snappy sink to hdfs")
>  
> In this way, flink API is called and written to HDFS. There are Chinese fields in the log, and the corresponding scrambled code is after hive is resolved,
> CREATE EXTERNAL TABLE `demo_app`(
>  `str` string COMMENT '原始记录json')
> COMMENT 'app flink埋点日志'
> PARTITIONED BY ( 
>  `ymd` string COMMENT '日期分区yyyymmdd')
> ROW FORMAT SERDE 
>  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
> STORED AS INPUTFORMAT 
>  'org.apache.hadoop.mapred.TextInputFormat' 
> OUTPUTFORMAT 
>  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>  'hdfs://nameservice1/user/xxx/inke_back.db'
> kafka source data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"完成","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  
> hive data :
> {"name":"inke.dcc.flume.collect","type":"flume","status":"������","batchDuration":3000,"proccessDelay":0,"shedulerDelay":0,"topic":"newserverlog_opd_operate_log","endpoint":"ali-a-opd-script01.bj","batchId":"xxx","batchTime":1588065997320,"numRecords":-1,"numBytes":-1,"totalRecords":0,"totalBytes":0,"ipAddr":"10.111.27.230"}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)