You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by kcz <57...@qq.com.INVALID> on 2021/07/01 14:49:45 UTC

flink-1.13.1 ddl kafka消费JSON数据 (ObjectNode) jsonNode错误

版本:1.13.1 报错信息如下:
Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
	at org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:344)
	at org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:376)
	at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121)
	at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:106)



DDL定义如下:
CREATE TABLE user_behavior (
&nbsp; &nbsp; user_id string&nbsp;
) WITH (
&nbsp;'connector' = 'kafka',
&nbsp; 'topic' = 'user_behavior',
&nbsp; 'properties.bootstrap.servers' = 'localhost:9092',
&nbsp; 'properties.group.id' = 'testGroup',
&nbsp; 'scan.startup.mode' = 'latest-offset',
&nbsp; 'format' = 'json'
);
select * from user_behavior;

主要代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
&nbsp; &nbsp; &nbsp; &nbsp; tableEnv.executeSql(sql).print();

     pom文件如下:


      &nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-table-api-java</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-table-planner-blink_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-connector-hive_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-table-api-java-bridge_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-table-planner_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-clients_${scala.version}</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;


&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; <dependency&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <groupId&gt;org.apache.flink</groupId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <artifactId&gt;flink-json</artifactId&gt;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <version&gt;${flink.version}</version&gt;
&nbsp; &nbsp; &nbsp; &nbsp; </dependency&gt;