You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by chuyuan <wl...@163.com> on 2020/09/21 06:55:44 UTC
Flink消费kafka中json数据,其中有个value是Json类型,写入Hive表Map结构异常
hello,大婶们,Flink消费kafka中json数据,示例:
{
"properties":{
"platformType":"APP",
"$os":"iOS",
"$screen_width":414,
"$app_version":"1.0",
"$is_first_day":false,
"$model":"x86_64",
"$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3",
"isLogin":false,
"zrIdfa":"00000000-0000-0000-0000-000000000000",
"$network_type":"WIFI",
"$wifi":true,
"$timezone_offset":-480,
"$resume_from_background":false,
"tdid":"",
"zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9",
"$screen_height":896,
"$lib_version":"2.0.10",
"$lib":"iOS",
"$os_version":"13.4.1",
"$manufacturer":"Apple",
"$is_first_time":false,
"$app_id":"Com.ziroom..ZRSensorsSDK"
},
"type":"track",
"lib":{
"$lib_version":"2.0.10",
"$lib":"iOS",
"$app_version":"1.0",
"$lib_method":"autoTrack"
}
}
其中key为lib和properties的value是Json类型,其中字段可动态追加,
然后我把Json传封装为DO,最后为了转换lib和properties的value为Map<String,String>,转换成了DTO,
@Data
public static class CustomBuriedPointDTO {
/**
* 跟踪ID
*/
private Long track_id;
/**
* 事件时间
*/
private Long event_time;
/**
* 类型
*/
private String type;
/**
* 排重后Id
*/
private String distinct_id;
/**
* 匿名ID
*/
private String anonymous_id;
/**
* 包信息
*/
private @DataTypeHint("RAW") Map<String, String> lib;
/**
* 事件
*/
private String event;
/**
* 属性
*/
// private Map<String, String> properties;
private @DataTypeHint("RAW") Map<String, String> properties;
/**
* 刷新时间
*/
private Long flush_time;
/**
* 事件日期
*/
private String dt;
/**
* 封装数据对象中字段信息
*/
public void assembly(CustomBuriedPointDO pointDO) {
// 复制DO属性到DTO
BeanUtils.copyProperties(pointDO, this);
/*
转换特殊字段
*/
// 设置分区日期
Long eventTimeLong = pointDO.getEvent_time();
if (eventTimeLong == null) {
eventTimeLong = System.currentTimeMillis();
}
Date eventTime = new Date(eventTimeLong);
DateFormat dateFormatDate = new SimpleDateFormat("yyyy-MM-dd");
this.setDt(dateFormatDate.format(eventTime));
// json字段转换为Map类型
Map<String, String> propertiesMap = null;
if (StringUtils.isNotBlank(pointDO.getProperties())) {
propertiesMap = (Map<String, String>)
JSON.parse(pointDO.getProperties());
}
this.setProperties(propertiesMap);
Map<String, String> libMap = null;
if (StringUtils.isNotBlank(pointDO.getLib())) {
libMap = (Map<String, String>) JSON.parse(pointDO.getLib());
}
this.setLib(libMap);
}
}
,然后把DataStream转成了Hive临时表,最后写入Hive表,hive表定义如下:
"CREATE TABLE test.test(" +
" type STRING," +
" lib MAP<STRING,STRING>," +
" properties MAP<STRING,STRING>" +
") PARTITIONED BY (" +
" dt string" +
" ) stored as orcfile " +
" TBLPROPERTIES" +
" (" +
"'partition.time-extractor.kind'='custom'," +
"'partition.time-extractor.timestamp-pattern'='$dt'," +
"'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor',"
+
"'sink.partition-commit.trigger'='partition-time'," +
"'sink.partition-commit.delay'='0s'," +
"'sink.partition-commit.policy.kind'='metastore'" +
")");
结果异常:
org.apache.flink.table.api.TableException: A raw type backed by type
information has no serializable string representation. It needs to be
resolved into a proper raw type.
,然后打印临时表的数据结构,发现lib和properties在临时表中数据结构为:
|-- lib: LEGACY('RAW', 'ANY<java.util.Map>')
|-- properties: LEGACY('RAW', 'ANY<java.util.Map>')
|-- track_id: BIGINT
|-- type: STRING
,说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配lib
MAP<STRING,STRING>,写入失败,有遇到类似问题的吗,求解答。
--
Sent from: http://apache-flink.147419.n8.nabble.com/