You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/04/14 11:17:05 UTC
流数据转化为json
你好,请问一下上游的数据是
SingleOutputStreamOperator<InPutInfo> outPutInfoStream = keyedStream.process(new KeyStreamFunc());
数据样式为:InPutInfo[phone='123456',workId='001']
我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
| |
小昌同学
|
|
ccc0606fighting@163.com
|
Re: 流数据转化为json
Posted by Weihua Hu <hu...@gmail.com>.
Hi,
你使用的那个 Flink 版本,建议直接参考 Flink 官方 kafka connector 文档[1]。
转换为 Json 数据格式可以使用 flink-json format
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/json/
Best,
Weihua
On Fri, Apr 14, 2023 at 7:17 PM 小昌同学 <cc...@163.com> wrote:
> 你好,请问一下上游的数据是
> SingleOutputStreamOperator<InPutInfo> outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fighting@163.com
> |
udf函数不能使用DataTypeHint("Row>")
Posted by Jeff <zi...@126.com>.
在自定义函数中使用DataTypeHint("Row<t ARRAY<INT>>")时报错,错误内容为:
Caused by: java.lang.ClassCastException: class [I cannot be cast to class [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of loader 'bootstrap')
\tat org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
\tat org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
\tat org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
\tat org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
\tat org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
\tat StreamExecCalc$251.processElement_split9(Unknown Source)
\tat StreamExecCalc$251.processElement(Unknown Source)
\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
函数内容如下:
@DataTypeHint("Row<t ARRAY<INT>>")
public Row eval() {
int[] i = new int[3];
return Row.of(i);
}
测试其它简单类型时就不会报这个错,所以不是环境问题。
Re: 流数据转化为json
Posted by Shammon FY <zj...@gmail.com>.
Hi
对于kafka的问题,使用print或者其他方式有数据输出吗?可以通过这种方式确认一下是作业本身的数据问题还是kafka的问题
Best,
Shammon FY
On Fri, Apr 14, 2023 at 7:17 PM 小昌同学 <cc...@163.com> wrote:
> 你好,请问一下上游的数据是
> SingleOutputStreamOperator<InPutInfo> outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fighting@163.com
> |