You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/04/14 11:17:05 UTC

流数据转化为json

你好,请问一下上游的数据是
SingleOutputStreamOperator<InPutInfo> outPutInfoStream = keyedStream.process(new KeyStreamFunc());
数据样式为:InPutInfo[phone='123456',workId='001']
我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
| |
小昌同学
|
|
ccc0606fighting@163.com
|

Re: 流数据转化为json

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

你使用的那个 Flink 版本,建议直接参考 Flink 官方 kafka connector 文档[1]。
转换为 Json 数据格式可以使用 flink-json  format

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/json/

Best,
Weihua


On Fri, Apr 14, 2023 at 7:17 PM 小昌同学 <cc...@163.com> wrote:

> 你好,请问一下上游的数据是
> SingleOutputStreamOperator<InPutInfo> outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fighting@163.com
> |

udf函数不能使用DataTypeHint("Row>")

Posted by Jeff <zi...@126.com>.
在自定义函数中使用DataTypeHint("Row<t ARRAY<INT>>")时报错,错误内容为:


Caused by: java.lang.ClassCastException: class [I cannot be cast to class [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of loader 'bootstrap')
\tat org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
\tat org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
\tat org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
\tat org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
\tat org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
\tat StreamExecCalc$251.processElement_split9(Unknown Source)
\tat StreamExecCalc$251.processElement(Unknown Source)
\tat org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)


函数内容如下:
@DataTypeHint("Row<t ARRAY<INT>>")
 public  Row eval() {
   int[] i = new int[3];
 return Row.of(i);
}


测试其它简单类型时就不会报这个错,所以不是环境问题。

Re: 流数据转化为json

Posted by Shammon FY <zj...@gmail.com>.
Hi

对于kafka的问题,使用print或者其他方式有数据输出吗?可以通过这种方式确认一下是作业本身的数据问题还是kafka的问题

Best,
Shammon FY


On Fri, Apr 14, 2023 at 7:17 PM 小昌同学 <cc...@163.com> wrote:

> 你好,请问一下上游的数据是
> SingleOutputStreamOperator<InPutInfo> outPutInfoStream =
> keyedStream.process(new KeyStreamFunc());
> 数据样式为:InPutInfo[phone='123456',workId='001']
> 我想直接将这个流输入到kafka中,直接使用addsink算子,但是查看kafka日志发现,数据内容没有插入进来,想请教一下有没有什么解决方案;
> 我现在自己想着将流中的数据转换为json,但是我使用了gson以及fastjson都不行,请各位大佬指点
> | |
> 小昌同学
> |
> |
> ccc0606fighting@163.com
> |