You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "laney0606@163.com" <la...@163.com> on 2017/09/25 01:40:35 UTC

Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

Hi,
     I‘m confused about a problem, occuring a exception "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field." 
     Both BillCount and Record are class object.  Following is code.
        
       case  class BillCount(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
       val kafkaInputStream: DataStream[Record] = env.addSource(source)   //source is FlinkKafkaConsumer010 source
       val tbDataStream : DataStream[BillCount] = kafkaInputStream.map(
                  new MapFunction[Record, BillCount] {
                    override def map(value: Record) = {
                      BillCount(value.getLogis_id, value.getProvince_id, value.getCity_id,
                              value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
        }
      })
     val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)       // occur error here


      
    Error :
    Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)




  Thanks.





laney0606@163.com

Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

Posted by Timo Walther <tw...@apache.org>.
Hi,

I also replied to your Stackoverflow question. I think the problem is 
that BillCount has the wrong type and is therefore treated as one single 
black box.

Haohui's suggestion will no work because the row type needs information 
about the fields.  The easiest thing is to figure out why BillCount has 
the wrong type. Make sure that it is defined in a statically.

What type is Record? Maybe you don't need the additional MapFunction but 
can use the Table API for mapping.

Regards,
Timo


Am 9/25/17 um 9:29 AM schrieb Haohui Mai:
> Hi,
>
> I think instead of generating DataStream[BillCount], the correct way 
> is to generate DataStream[Row], that is,
>
> kafkaInputStream.map(value -> Row.of(value.getLogis_id, 
> value.getProvince_id, value.getCity_id, 
> value.getOrder_require_varieties, value.getOrder_rec_amount, 
> value.getStore_rec_date.getTime)
>
> That should work.
>
> Regards,
> Haohui
>
>
>
> On Sun, Sep 24, 2017 at 6:40 PM laney0606@163.com 
> <ma...@163.com> <laney0606@163.com 
> <ma...@163.com>> wrote:
>
>     Hi,
>          I‘m confused about a problem, occuring a exception
>     "org.apache.flink.table.api.TableException: Table of atomic type can only have a single field."
>
>      Both *BillCount *and***Record *are class object*.*Following is code.
>       case  class
>     *BillCount*(logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
>     val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
>       //source is FlinkKafkaConsumer010 source
>     val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
>      new MapFunction[Record, BillCount] {
>     override def map(value: *Record*) = {
>     *BillCount*(value.getLogis_id, value.getProvince_id, value.getCity_id,
>     value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
>             }
>           })
>      val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)
>           // occur error here
>
>         Error :
>     Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
>     at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
>     at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
>     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
>     at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
>     at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
>     at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
>     at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
>       Thanks.
>
>
>     ------------------------------------------------------------------------
>     laney0606@163.com <ma...@163.com>
>
>
>     【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
>     <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9>
>
>


Re: Exception : Table of atomic type can only have a single field, when transferring DataStream to Table ?

Posted by Haohui Mai <ri...@gmail.com>.
Hi,

I think instead of generating DataStream[BillCount], the correct way is to
generate DataStream[Row], that is,

kafkaInputStream.map(value -> Row.of(value.getLogis_id,
value.getProvince_id, value.getCity_id, value.getOrder_require_varieties,
value.getOrder_rec_amount, value.getStore_rec_date.getTime)

That should work.

Regards,
Haohui



On Sun, Sep 24, 2017 at 6:40 PM laney0606@163.com <la...@163.com> wrote:

> Hi,
>      I‘m confused about a problem, occuring a exception "
> org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
> "
>      Both *BillCount *and *Record *are class object*.*  Following is code.
>
>        case  class *BillCount*
> (logisId: Int, provinceId: Int, cityId: Int, orderRequVari: Int, orderRecAmount: Double, orderRecDate: Long)
>        val kafkaInputStream: DataStream[*Record*] = env.addSource(source)
>   //source is FlinkKafkaConsumer010 source
>        val tbDataStream : DataStream[*BillCount*] = kafkaInputStream.map(
>                   new MapFunction[Record, BillCount] {
>                     override def map(value: *Record*) = {
>                       *BillCount*
> (value.getLogis_id, value.getProvince_id, value.getCity_id,
>
> value.getOrder_require_varieties, value.getOrder_rec_amount, value.getStore_rec_date.getTime)
>         }
>       })
>
>  val stream = tbDataStream.toTable(tbEnv, 'logisId, 'provinceId, 'cityId, 'orderRequVari, 'orderRecAmount, 'orderRecDate)
>       // occur error here
>
>
>     Error :
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Table of atomic type can only have a single field.
>
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:627)
>
> at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
>
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
>
> at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
>
> at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
>
> at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
>
> at org.apache.flink.table.api.scala.DataStreamConversions.toTable(DataStreamConversions.scala:58)
>
>
>   Thanks.
>
>
> ------------------------------
> laney0606@163.com
>
>
> 【网易自营|30天无忧退货】仅售同款价1/4!MUJI制造商“2017秋冬舒适家居拖鞋系列”限时仅34.9元>>
> <http://you.163.com/item/detail?id=1165011&from=web_gg_mail_jiaobiao_9>
>
>