You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Piyush Narang <p....@criteo.com> on 2019/06/03 16:20:10 UTC

Clean way of expressing UNNEST operations

Hi folks,

I’m using the SQL API and trying to figure out the best way to unnest and operate on some data.
My data is structured as follows:
Table:
Advertiser_event:

  *   Partnered: Int
  *   Products: Array< Row< price: Double, quantity: Int, … > >
  *   …

I’m trying to unnest the products array and then compute something on a couple of fields in the product row (e.g. price * quantity)

My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)

My issue / problem is that, when I try to unnest this array<row> I need to specify all the fields in the temp table as part of the unnest (“t” above). If I don’t, I get an error saying the number of fields doesn’t match what is expected. This makes my query a bit fragile in case additional fields are added / removed from this product structure.

Does anyone know if there’s a way around this? As a contrast on an engine like Presto, the unnest operation would yield a ‘product’ row type which I can then use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS JOIN UNNEST(products) AS product

Thanks,

-- Piyush


Re: Clean way of expressing UNNEST operations

Posted by Piyush Narang <p....@criteo.com>.
Hi Jingsong,

Thanks for getting back. I’ll try and hook up the UDTF.

I added a unit test which catches the issue I’m running into (I tested this against Flink 1.6 which is what we’re running as well as latest master). Did you have to do anything in particular to hook up the type correctly?

Error I get is: “Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 114 to line 1, column 120: List of column aliases must have same degree as table; table has 3 columns ('price', 'quantity', 'externalId'), whereas alias list has 1 columns”

@Test
def testArrayOfRow(): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  StreamITCase.clear

  class Event(t0: Int, t1: Array[Row]) extends org.apache.flink.api.java.tuple.Tuple2[Int, Array[Row]](t0, t1)
  def row(values: Any*): Row = Row.of(values.map(_.asInstanceOf[AnyRef]):_*)

  val rowType = Types.ROW(fieldNames = Array("price", "quantity", "externalId"), Array(Types.DOUBLE, Types.INT, Types.INT))
  implicit val typeInfo = new TupleTypeInfo[Event](Types.INT, Types.OBJECT_ARRAY(rowType))

  val myArr1 = Array[Row](row(12.45, 10, 1))
  val myArr2 = Array[Row](row(10.0, 1, 1), row(20.0, 1, 1))
  val myArr3 = Array[Row](row(12.45, 10, 1))

  val input = env.fromElements[Event](
    new Event(123, myArr1),
    new Event(123, myArr2),
    new Event(456, myArr3)
  )

  tEnv.registerDataStream[Event]("advertiser_event", input, 'partnerId, 'products)

  val table = tEnv.sqlQuery("SELECT partnerId, product.price, product.quantity FROM advertiser_event, UNNEST(advertiser_event.products) AS t (product) ")
  table.toAppendStream[Row](table.getSchema.toRowType).print()
}

When I list out all three fields instead of t(product), I don’t face the issue..

Thanks,

-- Piyush


From: JingsongLee <lz...@aliyun.com>
Reply-To: JingsongLee <lz...@aliyun.com>
Date: Tuesday, June 4, 2019 at 2:42 AM
To: JingsongLee <lz...@aliyun.com>, Piyush Narang <p....@criteo.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang

I tried again, if the type of advertiser_event.products is derived correctly. (ObjectTypeInfo(RowTypeInfo(fields...)))
It will work. See more information in calcite code: SqlUnnestOperator.inferReturnType
So I think maybe your type is not passed to the engine correctly.

Best, JingsongLee

------------------------------------------------------------------
From:JingsongLee <lz...@aliyun.com>
Send Time:2019年6月4日(星期二) 13:35
To:Piyush Narang <p....@criteo.com>; user@flink.apache.org <us...@flink.apache.org>
Subject:Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang
It seems that Calcite's type inference is not perfect, and the fields of return type can not be inferred in UNNEST. (Errors were reported during the Calcite Validate phase.)

But UDTF supports this usage, and if it's convenient, you might consider writing a UDTF with similar UNNEST functions to try it out. (Use JOIN LATERAL TABLE)

Best, JingsongLee

------------------------------------------------------------------
From:Piyush Narang <p....@criteo.com>
Send Time:2019年6月4日(星期二) 00:20
To:user@flink.apache.org <us...@flink.apache.org>
Subject:Clean way of expressing UNNEST operations

Hi folks,

I’m using the SQL API and trying to figure out the best way to unnest and operate on some data.
My data is structured as follows:
Table:
Advertiser_event:

  *   Partnered: Int
  *   Products: Array< Row< price: Double, quantity: Int, … > >
  *   …

I’m trying to unnest the products array and then compute something on a couple of fields in the product row (e.g. price * quantity)

My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)

My issue / problem is that, when I try to unnest this array<row> I need to specify all the fields in the temp table as part of the unnest (“t” above). If I don’t, I get an error saying the number of fields doesn’t match what is expected. This makes my query a bit fragile in case additional fields are added / removed from this product structure.

Does anyone know if there’s a way around this? As a contrast on an engine like Presto, the unnest operation would yield a ‘product’ row type which I can then use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS JOIN UNNEST(products) AS product

Thanks,

-- Piyush


Re: Clean way of expressing UNNEST operations

Posted by JingsongLee <lz...@aliyun.com>.
Hi @Piyush Narang

I tried again, if the type of advertiser_event.products is derived correctly. (ObjectTypeInfo(RowTypeInfo(fields...)))
It will work. See more information in calcite code: SqlUnnestOperator.inferReturnType
So I think maybe your type is not passed to the engine correctly.

Best, JingsongLee


------------------------------------------------------------------
From:JingsongLee <lz...@aliyun.com>
Send Time:2019年6月4日(星期二) 13:35
To:Piyush Narang <p....@criteo.com>; user@flink.apache.org <us...@flink.apache.org>
Subject:Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang
It seems that Calcite's type inference is not perfect, and the fields of return type can not be inferred in UNNEST. (Errors were reported during the Calcite Validate phase.)

But UDTF supports this usage, and if it's convenient, you might consider writing a UDTF with similar UNNEST functions to try it out. (Use JOIN LATERAL TABLE)

Best, JingsongLee


------------------------------------------------------------------
From:Piyush Narang <p....@criteo.com>
Send Time:2019年6月4日(星期二) 00:20
To:user@flink.apache.org <us...@flink.apache.org>
Subject:Clean way of expressing UNNEST operations

  
Hi folks,
 
I’m using the SQL API and trying to figure out the best way to unnest and operate on some data. 
My data is structured as follows:
Table:
Advertiser_event: 
Partnered: Int
Products: Array< Row< price: Double, quantity: Int, … > >
…
 
I’m trying to unnest the products array and then compute something on a couple of fields in the product row (e.g. price * quantity)
 
My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)
 
My issue / problem is that, when I try to unnest this array<row> I need to specify all the fields in the temp table as part of the unnest (“t” above). If I don’t, I get an error saying the number of fields doesn’t match what is expected. This makes my query a bit fragile in case additional fields are added / removed from this product structure. 
 
Does anyone know if there’s a way around this? As a contrast on an engine like Presto, the unnest operation would yield a ‘product’ row type which I can then use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS JOIN UNNEST(products) AS product
 
Thanks,
 
-- Piyush

Re: Clean way of expressing UNNEST operations

Posted by JingsongLee <lz...@aliyun.com>.
Hi @Piyush Narang
It seems that Calcite's type inference is not perfect, and the fields of return type can not be inferred in UNNEST. (Errors were reported during the Calcite Validate phase.)

But UDTF supports this usage, and if it's convenient, you might consider writing a UDTF with similar UNNEST functions to try it out. (Use JOIN LATERAL TABLE)

Best, JingsongLee


------------------------------------------------------------------
From:Piyush Narang <p....@criteo.com>
Send Time:2019年6月4日(星期二) 00:20
To:user@flink.apache.org <us...@flink.apache.org>
Subject:Clean way of expressing UNNEST operations

  
Hi folks,
 
I’m using the SQL API and trying to figure out the best way to unnest and operate on some data. 
My data is structured as follows:
Table:
Advertiser_event: 
Partnered: Int
Products: Array< Row< price: Double, quantity: Int, … > >
…
 
I’m trying to unnest the products array and then compute something on a couple of fields in the product row (e.g. price * quantity)
 
My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)
 
My issue / problem is that, when I try to unnest this array<row> I need to specify all the fields in the temp table as part of the unnest (“t” above). If I don’t, I get an error saying the number of fields doesn’t match what is expected. This makes my query a bit fragile in case additional fields are added / removed from this product structure. 
 
Does anyone know if there’s a way around this? As a contrast on an engine like Presto, the unnest operation would yield a ‘product’ row type which I can then use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS JOIN UNNEST(products) AS product
 
Thanks,
 
-- Piyush