You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "laney0606@163.com" <la...@163.com> on 2017/09/25 01:40:35 UTC
Exception : Table of atomic type can only have a single field, when
transferring DataStream to Table ?
Hi,
I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field."
Both BillCount and Record are class object. Following is code.
case class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
val kafkaInputStream: DataStream[Record] = env.addSource(source) //source is FlinkKafkaConsumer010 source
val tbDataStream : DataStream[BillCount] = kafkaInputStream.map(
new MapFunction[Record, BillCount] {
override def map(value: Record) = {
BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id,
value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
}
})
val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate) // occur error here
Error :
Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
Thanks.
laney0606@163.com
Re: Exception : Table of atomic type can only have a single field,
when transferring DataStream to Table ?
Posted by Timo Walther <tw...@apache.org>.
Hi,
I also replied to your Stackoverflow question. I think the problem is
that BillCount has the wrong type and is therefore treated as one single
black box.
Haohui's suggestion will no work because the row type needs information
about the fields. The easiest thing is to figure out why BillCount has
the wrong type. Make sure that it is defined in a statically.
What type is Record? Maybe you don't need the additional MapFunction but
can use the Table API for mapping.
Regards,
Timo
Am 9/25/17 um 9:29 AM schrieb Haohui Mai:
> Hi,
>
> I think instead of generating DataStream[BillCount], the correct way
> is to generate DataStream[Row], that is,
>
> kafkaInputStream.map(value -> Row.of(value.getLogis_id,
> value.getProvince_id, value.getCity_id,
> value.getOrder_require_varieties, value.getOrder_rec_amount,
> value.getStore_rec_date.getTime)
>
> That should work.
>
> Regards,
> Haohui
>
>
>
> On Sun, Sep 24, 2017 at 6:40 PM laney0606@163.com
> <ma...@163.com> <laney0606@163.com
> <ma...@163.com>> wrote:
>
> Hi,
> I‘m confused about a problem, occuring a exception
> "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field."
>
> Both *BillCount *and***Record *are class object*.*Following is code.
> case class
> *BillCount*(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
> val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
> //source is FlinkKafkaConsumer010 source
> val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
> new MapFunction[Record, BillCount] {
> override def map(value: *Record*) = {
> *BillCount*(value.getLogis_id, value.getProvince_id, value.getCity_id,
> value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
> }
> })
> val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)
> // occur error here
>
> Error :
> Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
> at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
> at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
> at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
> at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
> Thanks.
>
>
> ------------------------------------------------------------------------
> laney0606@163.com <ma...@163.com>
>
>
> 【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
> <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9>
>
>
Re: Exception : Table of atomic type can only have a single field,
when transferring DataStream to Table ?
Posted by Haohui Mai <ri...@gmail.com>.
Hi,
I think instead of generating DataStream[BillCount], the correct way is to
generate DataStream[Row], that is,
kafkaInputStream.map(value -> Row.of(value.getLogis_id,
value.getProvince_id, value.getCity_id, value.getOrder_require_varieties,
value.getOrder_rec_amount, value.getStore_rec_date.getTime)
That should work.
Regards,
Haohui
On Sun, Sep 24, 2017 at 6:40 PM laney0606@163.com <la...@163.com> wrote:
> Hi,
> I‘m confused about a problem, occuring a exception "
> org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
> "
> Both *BillCount *and *Record *are class object*.* Following is code.
>
> case class *BillCount*
> (logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
> val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
> //source is FlinkKafkaConsumer010 source
> val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
> new MapFunction[Record, BillCount] {
> override def map(value: *Record*) = {
> *BillCount*
> (value.getLogis_id, value.getProvince_id, value.getCity_id,
>
> value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
> }
> })
>
> val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)
> // occur error here
>
>
> Error :
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
>
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
>
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
>
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
>
> at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
>
> at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
>
> at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
>
> at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
> Thanks.
>
>
> ------------------------------
> laney0606@163.com
>
>
> 【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
> <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9>
>
>