You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by nashcen <24...@qq.com> on 2020/09/22 08:29:41 UTC

Flink-1.11.1 Kafka Table API BigInt 问题

*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。

package com.athub.dcpoints.scala.connector.table.hive

import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._

/**
 * @author Nash Cen
 * @date 2020/9/22 10:33
 * @version 1.0
 */
object KafkaFlinkHiveScalaApp {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hdfs")

    // 1. 获取执行环境
    val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)

    // 2. 注册 KafkaSource 表
    tableEnv.connect(new Kafka()
      .version("universal")
      .topic("ods_dcpoints_dev")
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
      //.property("group.id", "testGroup")
      .startFromEarliest()
    )
      .withFormat(new Json())
      .withSchema(new Schema()
        .field("assetSpecId", DataTypes.STRING())
        .field("dcnum", DataTypes.STRING())
        .field("monitorType", DataTypes.STRING())
        .field("tagNo", DataTypes.STRING())
        .field("updateTime", DataTypes.BIGINT())
        .field("value", DataTypes.STRING())

      )
      .createTemporaryTable("kafka_source_table")

    // 3. 查询转换
    // 使用 sqlQuery
    val selectKafkaTable: Table = tableEnv.sqlQuery(s"select 
    assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")
   
selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")

    env.execute("KafkaFlinkHiveScalaApp")

  }

}


*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
	at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
	at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
	at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
	at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)






--
Sent from: http://apache-flink.147419.n8.nabble.com/

答复: Flink-1.11.1 Kafka Table API BigInt 问题

Posted by 刘首维 <li...@autohome.com.cn>.
Hi,

试一下java的BigInteger呢

________________________________
发件人: nashcen <24...@qq.com>
发送时间: 2020年9月22日 16:29:41
收件人: user-zh@flink.apache.org
主题: Flink-1.11.1 Kafka Table API BigInt 问题

*我的代码如下*
其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。

package com.athub.dcpoints.scala.connector.table.hive

import com.athub.dcpoints.java.model.KafkaDcpoints
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.descriptors._

/**
 * @author Nash Cen
 * @date 2020/9/22 10:33
 * @version 1.0
 */
object KafkaFlinkHiveScalaApp {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hdfs")

    // 1. 获取执行环境
    val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv: TableEnvironment =
StreamTableEnvironment.create(env,settings)

    // 2. 注册 KafkaSource 表
    tableEnv.connect(new Kafka()
      .version("universal")
      .topic("ods_dcpoints_dev")
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
      //.property("group.id", "testGroup")
      .startFromEarliest()
    )
      .withFormat(new Json())
      .withSchema(new Schema()
        .field("assetSpecId", DataTypes.STRING())
        .field("dcnum", DataTypes.STRING())
        .field("monitorType", DataTypes.STRING())
        .field("tagNo", DataTypes.STRING())
        .field("updateTime", DataTypes.BIGINT())
        .field("value", DataTypes.STRING())

      )
      .createTemporaryTable("kafka_source_table")

    // 3. 查询转换
    // 使用 sqlQuery
    val selectKafkaTable: Table = tableEnv.sqlQuery(s"select
    assetSpecId,dcnum,monitorType,tagNo,updateTime from kafka_source_table")

selectKafkaTable.toAppendStream[(String,String,String,String,BigInt)].print("selectKafka")

    env.execute("KafkaFlinkHiveScalaApp")

  }

}


*运行时报错信息如下:*
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class
scala.math.BigInt does not contain a setter for field bigInteger
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class
scala.math.BigInt cannot be used as a POJO type because not all fields are
valid POJO fields, and must be processed as GenericType. Please read the
Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: A raw
type backed by type information has no serializable string representation.
It needs to be resolved into a proper raw type.
        at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
        at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
        at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$3.apply(TableSinkUtils.scala:95)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)






--
Sent from: http://apache-flink.147419.n8.nabble.com/