You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 刘首维 <li...@autohome.com.cn> on 2020/09/22 10:24:11 UTC
答复: Flink-1.11.1 Kafka Table API BigInt 问题
Hi,
试一下java的BigInteger呢
________________________________
发件人: nashcen <24...@qq.com>
发送时间: 2020年9月22日 16:29:41
收件人: user-zh@flink.apache.org
主题: Flink-1.11.1 Kafka Table API BigInt 问题
*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。
package com.athub.dcpoints.scala.connector.table.hive
import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._
/**
* @author Nash Cen
* @date 2020/9/22 10:33
* @version 1.0
*/
object KafkaFlinkHiveScalaApp {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")
// 1. 获取执行环境
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)
// 2. 注册 KafkaSource 表
tableEnv.connect(new Kafka()
.version("universal")
.topic("ods_dcpoints_dev")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
//.property("group.id", "testGroup")
.startFromEarliest()
)
.withFormat(new Json())
.withSchema(new Schema()
.field("assetSpecId", DataTypes.STRING())
.field("dcnum", DataTypes.STRING())
.field("monitorType", DataTypes.STRING())
.field("tagNo", DataTypes.STRING())
.field("updateTime", DataTypes.BIGINT())
.field("value", DataTypes.STRING())
)
.createTemporaryTable("kafka_source_table")
// 3. 查询转换
// 使用 sqlQuery
val selectKafkaTable: Table = tableEnv.sqlQuery(s"select
assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")
selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")
env.execute("KafkaFlinkHiveScalaApp")
}
}
*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
--
Sent from: http://apache-flink.147419.n8.nabble.com/