You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2018/01/05 10:31:39 UTC

Re: Using SQL with dynamic tables where rows are updated

Hi Ghassan,

Flink's Table API / SQL does not support the upsert ingestion mode
(updating table rows by key) yet but only append mode, i.e, each event of
the data stream is appended to the table.
Hence, it is not possible to implement your use case using SQL.

An upsert ingestion mode will be added in future version of Flink.

Best, Fabian

2017-12-21 5:20 GMT+01:00 Ghassan Yammine <Gh...@bazaarvoice.com>:

> Hi Timo,
>
> Thanks for your quick reply.
>
> I understand your statement about the SQL expression:  It is grouped by
> ProductID and performs the average computation over all the records it
> sees.  There is no concept of primary key and INSERT/UPDATE.
>
> Is there a way to do what I want using SQL without, or with, the
> pre-processing of the stream before a SQL expression is executed?
>
> One thought I had was to transform the stream - prior to the SQL
> expression - by doing something like morphing the value of the Rating to be
> relative (instead of absolute) to the prior Rating, for the same review.
> However, this would entail some review tracking mechanism.
>
> As I mentioned before, I’ve implemented this using the Datastream API -
> internally maintaining a map of reviews and updating the average
> calculation as each record is processed.  Using SQL is very important to
> us, and I’d like to see if I can make it work.
>
> Regarding the output sink, I previously looked into that and concluded
> that I could not perform any “adjustment” to the average calculation
> because of what the current SQL expression emits (i.e. incorrect average
> calc.).  Here is what the (retract) stream looks like post-SQL:
>
> (true,(product-100,1.0))
> (false,(product-100,1.0))
> (true,(product-100,2.5))
> (false,(product-100,2.5))
> (true,(product-100,3.6666666666666665))
> (false,(product-100,3.6666666666666665))
> (true,(product-100,3.5)).   <———————— This is the correct value (per the
> SQL) but not what I want it to be.
>
> Even if I change what the SQL query returns/emits so that the TableSink
> can perform the average calculation, the latter will have to track prior
> reviews in order to update its average calculation.  If I’m correct, then
> this is essentially no different than the Datastream API implementation
> that I have.
>
> Agains, thanks for your quick response and help.
>
> Regards,
>
> Ghassan
>
>
>
> On Dec 20, 2017, at 11:50 AM, Timo Walther <twalthr@apache.org<mailto:twa
> lthr@apache.org>> wrote:
>
> Hi Ghassan,
>
> in your example the result 3.5 is correct. The query is executed with
> standard SQL semantics. You only group by ProductID and since it is the
> same for all elements, the average is 3.5.
>
> The second "review-3" does not replace anything. In general, the
> replacement would happen in the TableSink. The dynamic table performs view
> maintenance. The TableSink materializes the result to some key-value store
> or database.
>
> It might be worth to look into TableSinks [0] and the JavaDocs of the
> mentioned classes.
>
> Feel free to ask further questions if necessary.
>
> Regards,
> Timo
>
> [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/
> sourceSinks.html#define-a-tablesink
>
>
>
> Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
> Hello,
>
> I’m knew to Flink and I need some help.  I’d like to use the SQL API for
> processing an incoming stream that has the following characteristics:
>
>   *   Each stream record has a key
>   *   The record can be updated
>   *   The record is of the form: reviewId -> (productId, rating)
>
> For the above stream, I want to compute the average rating for each
> product ID.  The key is the reviewId
> With the SQL API, I get incorrect results.  However, I’ve been able to
> make it work through the use of RichFlatMapFunction and the Datastream API.
>
> Below is the entire code listing, which does not work.  I know I’m missing
> the definition/use of a primary key so that an update on the same key can
> occur.
> However, I’m not sure how to go about doing this.  Any help/comments are
> welcome.
>
> Thank you,
>
> Ghassan
>
>
> package com.bazaarvoice.flink_poc
>
> import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
> import org.apache.flink.api.common.time.Time
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.scala.{DataStream,
> createTypeInformation, _}
> import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
> import org.apache.flink.table.api.scala._
>
> package object flink_poc{
>   type ProductId = String
>   type ReviewId = String
> }
>
> case class SummaryReview(reviewId: ReviewId, productId: ProductId,
> approved: Boolean, rating: Double) extends Serializable {
>   override def toString: String = {
>     s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
> "REJECTED"},  $rating"
>   }
> }
>
> object AverageRatingWithSQL {
>
>   def main(args: Array[String]) {
>
>     val events = List(
>       SummaryReview("review-1", "product-100", approved = true, 1),
>       SummaryReview("review-2", "product-100", approved = true, 4),
>       SummaryReview("review-3", "product-100", approved = true, 6),
>       SummaryReview("review-3", "product-100", approved = true, 3)    //
> <-- this should override the previous record
>     ).toSeq
>     // Average rating should be equal to (1+4+3)/3 = 2.666667
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>
>     val inputStream: DataStream[SummaryReview] = env.fromCollection(events)
>
>     val tableEnv = TableEnvironment.getTableEnvironment(env)
>
>     tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
> 'ProductID, 'Approved, 'Rating)
>
>     val resultTable = tableEnv.sql(
>       "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true
> GROUP BY ProductID"
>     )
>
>     val typeInfo = createTypeInformation[(ProductId, Double)]
>     val outStream = resultTable.toRetractStream(typeInfo)
>
>     outStream.print()
>
>     env.execute("Flink SQL Average rating")
>
>   }
> }
>
>
>
>
>
>
>

Re: Using SQL with dynamic tables where rows are updated

Posted by Ghassan Yammine <Gh...@bazaarvoice.com>.
Hello,

No, I did not use Hequn’s work.  I already had an implementation the pre-processed the stream before feeding it nto the (now modified) SQL expression.
Basically the SQL, instead of computing the AVG, just computes the COUNT and SUM.  I do the average computation on the resulting stream.

> On Jan 10, 2018, at 3:35 AM, yinhua <yh...@gmail.com> wrote:
> 
> Hi Ghassan,
> 
> I have the same issue with you, not sure how you solve your problem now?
> Using Hequn's work around?
> 
> 
> 
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


Re: Using SQL with dynamic tables where rows are updated

Posted by yinhua <yh...@gmail.com>.
Hi Ghassan,

I have the same issue with you, not sure how you solve your problem now?
Using Hequn's work around?



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Re: Using SQL with dynamic tables where rows are updated

Posted by Ghassan Yammine <Gh...@bazaarvoice.com>.
Thanks very much!

Regards,
Ghassan


> On Jan 5, 2018, at 7:55 AM, Hequn Cheng <ch...@gmail.com> wrote:
> 
> hi Ghassan,
> 
> TableSource in Flink doesn't support primary key now, but you can achieve
> it by doing a group by manually.
> 
> Such as:
> val resultTable = tableEnv.sql(
>      "
>           SELECT ProductID, AVG(Rating) FROM
>                 ( SELECT ReviewID, LAST_VALUE(ProductID),
> LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY  ReviewID)
>           WHERE Approved = true GROUP BY ProductID
>        "
>    )
> 
> You have to implement the LAST_VALUE AggregateFunction. For implementation,
> you can refer to the MAX AggregateFunction(MAX always return the max value
> while LAST_VALUE always return the latest value). Also, you can find
> documents about Aggregate Functions here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/table/udfs.
> html#aggregation-functions
> 
> Best, Hequn
> 
> 
> 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fh...@gmail.com>:
> 
>> Hi Ghassan,
>> 
>> Flink's Table API / SQL does not support the upsert ingestion mode
>> (updating table rows by key) yet but only append mode, i.e, each event of
>> the data stream is appended to the table.
>> Hence, it is not possible to implement your use case using SQL.
>> 
>> An upsert ingestion mode will be added in future version of Flink.
>> 
>> Best, Fabian
>> 
>> 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <Ghassan.Yammine@bazaarvoice.com
>>> :
>> 
>>> Hi Timo,
>>> 
>>> Thanks for your quick reply.
>>> 
>>> I understand your statement about the SQL expression:  It is grouped by
>>> ProductID and performs the average computation over all the records it
>>> sees.  There is no concept of primary key and INSERT/UPDATE.
>>> 
>>> Is there a way to do what I want using SQL without, or with, the
>>> pre-processing of the stream before a SQL expression is executed?
>>> 
>>> One thought I had was to transform the stream - prior to the SQL
>>> expression - by doing something like morphing the value of the Rating to
>> be
>>> relative (instead of absolute) to the prior Rating, for the same review.
>>> However, this would entail some review tracking mechanism.
>>> 
>>> As I mentioned before, I’ve implemented this using the Datastream API -
>>> internally maintaining a map of reviews and updating the average
>>> calculation as each record is processed.  Using SQL is very important to
>>> us, and I’d like to see if I can make it work.
>>> 
>>> Regarding the output sink, I previously looked into that and concluded
>>> that I could not perform any “adjustment” to the average calculation
>>> because of what the current SQL expression emits (i.e. incorrect average
>>> calc.).  Here is what the (retract) stream looks like post-SQL:
>>> 
>>> (true,(product-100,1.0))
>>> (false,(product-100,1.0))
>>> (true,(product-100,2.5))
>>> (false,(product-100,2.5))
>>> (true,(product-100,3.6666666666666665))
>>> (false,(product-100,3.6666666666666665))
>>> (true,(product-100,3.5)).   <———————— This is the correct value (per the
>>> SQL) but not what I want it to be.
>>> 
>>> Even if I change what the SQL query returns/emits so that the TableSink
>>> can perform the average calculation, the latter will have to track prior
>>> reviews in order to update its average calculation.  If I’m correct, then
>>> this is essentially no different than the Datastream API implementation
>>> that I have.
>>> 
>>> Agains, thanks for your quick response and help.
>>> 
>>> Regards,
>>> 
>>> Ghassan
>>> 
>>> 
>>> 
>>> On Dec 20, 2017, at 11:50 AM, Timo Walther <twalthr@apache.org<mailto:
>> twa
>>> lthr@apache.org>> wrote:
>>> 
>>> Hi Ghassan,
>>> 
>>> in your example the result 3.5 is correct. The query is executed with
>>> standard SQL semantics. You only group by ProductID and since it is the
>>> same for all elements, the average is 3.5.
>>> 
>>> The second "review-3" does not replace anything. In general, the
>>> replacement would happen in the TableSink. The dynamic table performs
>> view
>>> maintenance. The TableSink materializes the result to some key-value
>> store
>>> or database.
>>> 
>>> It might be worth to look into TableSinks [0] and the JavaDocs of the
>>> mentioned classes.
>>> 
>>> Feel free to ask further questions if necessary.
>>> 
>>> Regards,
>>> Timo
>>> 
>>> [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/
>>> sourceSinks.html#define-a-tablesink
>>> 
>>> 
>>> 
>>> Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
>>> Hello,
>>> 
>>> I’m knew to Flink and I need some help.  I’d like to use the SQL API for
>>> processing an incoming stream that has the following characteristics:
>>> 
>>>  *   Each stream record has a key
>>>  *   The record can be updated
>>>  *   The record is of the form: reviewId -> (productId, rating)
>>> 
>>> For the above stream, I want to compute the average rating for each
>>> product ID.  The key is the reviewId
>>> With the SQL API, I get incorrect results.  However, I’ve been able to
>>> make it work through the use of RichFlatMapFunction and the Datastream
>> API.
>>> 
>>> Below is the entire code listing, which does not work.  I know I’m
>> missing
>>> the definition/use of a primary key so that an update on the same key can
>>> occur.
>>> However, I’m not sure how to go about doing this.  Any help/comments are
>>> welcome.
>>> 
>>> Thank you,
>>> 
>>> Ghassan
>>> 
>>> 
>>> package com.bazaarvoice.flink_poc
>>> 
>>> import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
>>> import org.apache.flink.api.common.time.Time
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.apache.flink.streaming.api.scala.{DataStream,
>>> createTypeInformation, _}
>>> import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
>>> import org.apache.flink.table.api.scala._
>>> 
>>> package object flink_poc{
>>>  type ProductId = String
>>>  type ReviewId = String
>>> }
>>> 
>>> case class SummaryReview(reviewId: ReviewId, productId: ProductId,
>>> approved: Boolean, rating: Double) extends Serializable {
>>>  override def toString: String = {
>>>    s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
>>> "REJECTED"},  $rating"
>>>  }
>>> }
>>> 
>>> object AverageRatingWithSQL {
>>> 
>>>  def main(args: Array[String]) {
>>> 
>>>    val events = List(
>>>      SummaryReview("review-1", "product-100", approved = true, 1),
>>>      SummaryReview("review-2", "product-100", approved = true, 4),
>>>      SummaryReview("review-3", "product-100", approved = true, 6),
>>>      SummaryReview("review-3", "product-100", approved = true, 3)    //
>>> <-- this should override the previous record
>>>    ).toSeq
>>>    // Average rating should be equal to (1+4+3)/3 = 2.666667
>>> 
>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>> 
>>>    val inputStream: DataStream[SummaryReview] =
>> env.fromCollection(events)
>>> 
>>>    val tableEnv = TableEnvironment.getTableEnvironment(env)
>>> 
>>>    tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
>>> 'ProductID, 'Approved, 'Rating)
>>> 
>>>    val resultTable = tableEnv.sql(
>>>      "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true
>>> GROUP BY ProductID"
>>>    )
>>> 
>>>    val typeInfo = createTypeInformation[(ProductId, Double)]
>>>    val outStream = resultTable.toRetractStream(typeInfo)
>>> 
>>>    outStream.print()
>>> 
>>>    env.execute("Flink SQL Average rating")
>>> 
>>>  }
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 


Re: Using SQL with dynamic tables where rows are updated

Posted by Fabian Hueske <fh...@gmail.com>.
Also, if you expect your the key update records to be out of order, you
might want to add a ProcessFunction on a keyed stream that filters records
with smaller timestamps than the highest observed timestamp.
This would prevent a record to be overridden by an earlier version with a
smaller timestamp.

2018-01-05 15:05 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> Ha, that's a neat workaround! Thanks for sharing Hequn!
>
> When doing this, you should however, ensure that all records with the same
> key arrive from the same input task to avoid inconsistent behavior due to
> records arriving out of order.
> This would be the case if you ingest the table directly from a Kafka topic
> that is partitioned by key.
>
> Best, Fabian
>
> 2018-01-05 14:55 GMT+01:00 Hequn Cheng <ch...@gmail.com>:
>
>> hi Ghassan,
>>
>> TableSource in Flink doesn't support primary key now, but you can achieve
>> it by doing a group by manually.
>>
>> Such as:
>> val resultTable = tableEnv.sql(
>>       "
>>            SELECT ProductID, AVG(Rating) FROM
>>                  ( SELECT ReviewID, LAST_VALUE(ProductID),
>> LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY  ReviewID)
>>            WHERE Approved = true GROUP BY ProductID
>>         "
>>     )
>>
>> You have to implement the LAST_VALUE AggregateFunction. For
>> implementation,
>> you can refer to the MAX AggregateFunction(MAX always return the max value
>> while LAST_VALUE always return the latest value). Also, you can find
>> documents about Aggregate Functions here: https://ci.apache.org/
>> projects/flink/flink-docs-release-1.4/dev/table/udfs.
>> html#aggregation-functions
>>
>> Best, Hequn
>>
>>
>> 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fh...@gmail.com>:
>>
>> > Hi Ghassan,
>> >
>> > Flink's Table API / SQL does not support the upsert ingestion mode
>> > (updating table rows by key) yet but only append mode, i.e, each event
>> of
>> > the data stream is appended to the table.
>> > Hence, it is not possible to implement your use case using SQL.
>> >
>> > An upsert ingestion mode will be added in future version of Flink.
>> >
>> > Best, Fabian
>> >
>> > 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <
>> Ghassan.Yammine@bazaarvoice.com
>> > >:
>> >
>> > > Hi Timo,
>> > >
>> > > Thanks for your quick reply.
>> > >
>> > > I understand your statement about the SQL expression:  It is grouped
>> by
>> > > ProductID and performs the average computation over all the records it
>> > > sees.  There is no concept of primary key and INSERT/UPDATE.
>> > >
>> > > Is there a way to do what I want using SQL without, or with, the
>> > > pre-processing of the stream before a SQL expression is executed?
>> > >
>> > > One thought I had was to transform the stream - prior to the SQL
>> > > expression - by doing something like morphing the value of the Rating
>> to
>> > be
>> > > relative (instead of absolute) to the prior Rating, for the same
>> review.
>> > > However, this would entail some review tracking mechanism.
>> > >
>> > > As I mentioned before, I’ve implemented this using the Datastream API
>> -
>> > > internally maintaining a map of reviews and updating the average
>> > > calculation as each record is processed.  Using SQL is very important
>> to
>> > > us, and I’d like to see if I can make it work.
>> > >
>> > > Regarding the output sink, I previously looked into that and concluded
>> > > that I could not perform any “adjustment” to the average calculation
>> > > because of what the current SQL expression emits (i.e. incorrect
>> average
>> > > calc.).  Here is what the (retract) stream looks like post-SQL:
>> > >
>> > > (true,(product-100,1.0))
>> > > (false,(product-100,1.0))
>> > > (true,(product-100,2.5))
>> > > (false,(product-100,2.5))
>> > > (true,(product-100,3.6666666666666665))
>> > > (false,(product-100,3.6666666666666665))
>> > > (true,(product-100,3.5)).   <———————— This is the correct value (per
>> the
>> > > SQL) but not what I want it to be.
>> > >
>> > > Even if I change what the SQL query returns/emits so that the
>> TableSink
>> > > can perform the average calculation, the latter will have to track
>> prior
>> > > reviews in order to update its average calculation.  If I’m correct,
>> then
>> > > this is essentially no different than the Datastream API
>> implementation
>> > > that I have.
>> > >
>> > > Agains, thanks for your quick response and help.
>> > >
>> > > Regards,
>> > >
>> > > Ghassan
>> > >
>> > >
>> > >
>> > > On Dec 20, 2017, at 11:50 AM, Timo Walther <twalthr@apache.org
>> <mailto:
>> > twa
>> > > lthr@apache.org>> wrote:
>> > >
>> > > Hi Ghassan,
>> > >
>> > > in your example the result 3.5 is correct. The query is executed with
>> > > standard SQL semantics. You only group by ProductID and since it is
>> the
>> > > same for all elements, the average is 3.5.
>> > >
>> > > The second "review-3" does not replace anything. In general, the
>> > > replacement would happen in the TableSink. The dynamic table performs
>> > view
>> > > maintenance. The TableSink materializes the result to some key-value
>> > store
>> > > or database.
>> > >
>> > > It might be worth to look into TableSinks [0] and the JavaDocs of the
>> > > mentioned classes.
>> > >
>> > > Feel free to ask further questions if necessary.
>> > >
>> > > Regards,
>> > > Timo
>> > >
>> > > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> > dev/table/
>> > > sourceSinks.html#define-a-tablesink
>> > >
>> > >
>> > >
>> > > Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
>> > > Hello,
>> > >
>> > > I’m knew to Flink and I need some help.  I’d like to use the SQL API
>> for
>> > > processing an incoming stream that has the following characteristics:
>> > >
>> > >   *   Each stream record has a key
>> > >   *   The record can be updated
>> > >   *   The record is of the form: reviewId -> (productId, rating)
>> > >
>> > > For the above stream, I want to compute the average rating for each
>> > > product ID.  The key is the reviewId
>> > > With the SQL API, I get incorrect results.  However, I’ve been able to
>> > > make it work through the use of RichFlatMapFunction and the Datastream
>> > API.
>> > >
>> > > Below is the entire code listing, which does not work.  I know I’m
>> > missing
>> > > the definition/use of a primary key so that an update on the same key
>> can
>> > > occur.
>> > > However, I’m not sure how to go about doing this.  Any help/comments
>> are
>> > > welcome.
>> > >
>> > > Thank you,
>> > >
>> > > Ghassan
>> > >
>> > >
>> > > package com.bazaarvoice.flink_poc
>> > >
>> > > import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
>> > > import org.apache.flink.api.common.time.Time
>> > > import org.apache.flink.streaming.api.TimeCharacteristic
>> > > import org.apache.flink.streaming.api.scala.{DataStream,
>> > > createTypeInformation, _}
>> > > import org.apache.flink.table.api.{StreamQueryConfig,
>> TableEnvironment}
>> > > import org.apache.flink.table.api.scala._
>> > >
>> > > package object flink_poc{
>> > >   type ProductId = String
>> > >   type ReviewId = String
>> > > }
>> > >
>> > > case class SummaryReview(reviewId: ReviewId, productId: ProductId,
>> > > approved: Boolean, rating: Double) extends Serializable {
>> > >   override def toString: String = {
>> > >     s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
>> > > "REJECTED"},  $rating"
>> > >   }
>> > > }
>> > >
>> > > object AverageRatingWithSQL {
>> > >
>> > >   def main(args: Array[String]) {
>> > >
>> > >     val events = List(
>> > >       SummaryReview("review-1", "product-100", approved = true, 1),
>> > >       SummaryReview("review-2", "product-100", approved = true, 4),
>> > >       SummaryReview("review-3", "product-100", approved = true, 6),
>> > >       SummaryReview("review-3", "product-100", approved = true, 3)
>> //
>> > > <-- this should override the previous record
>> > >     ).toSeq
>> > >     // Average rating should be equal to (1+4+3)/3 = 2.666667
>> > >
>> > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
>> > >     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> > >
>> > >     val inputStream: DataStream[SummaryReview] =
>> > env.fromCollection(events)
>> > >
>> > >     val tableEnv = TableEnvironment.getTableEnvironment(env)
>> > >
>> > >     tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
>> > > 'ProductID, 'Approved, 'Rating)
>> > >
>> > >     val resultTable = tableEnv.sql(
>> > >       "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved =
>> true
>> > > GROUP BY ProductID"
>> > >     )
>> > >
>> > >     val typeInfo = createTypeInformation[(ProductId, Double)]
>> > >     val outStream = resultTable.toRetractStream(typeInfo)
>> > >
>> > >     outStream.print()
>> > >
>> > >     env.execute("Flink SQL Average rating")
>> > >
>> > >   }
>> > > }
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> >
>>
>
>

Re: Using SQL with dynamic tables where rows are updated

Posted by Fabian Hueske <fh...@gmail.com>.
Ha, that's a neat workaround! Thanks for sharing Hequn!

When doing this, you should however, ensure that all records with the same
key arrive from the same input task to avoid inconsistent behavior due to
records arriving out of order.
This would be the case if you ingest the table directly from a Kafka topic
that is partitioned by key.

Best, Fabian

2018-01-05 14:55 GMT+01:00 Hequn Cheng <ch...@gmail.com>:

> hi Ghassan,
>
> TableSource in Flink doesn't support primary key now, but you can achieve
> it by doing a group by manually.
>
> Such as:
> val resultTable = tableEnv.sql(
>       "
>            SELECT ProductID, AVG(Rating) FROM
>                  ( SELECT ReviewID, LAST_VALUE(ProductID),
> LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY  ReviewID)
>            WHERE Approved = true GROUP BY ProductID
>         "
>     )
>
> You have to implement the LAST_VALUE AggregateFunction. For implementation,
> you can refer to the MAX AggregateFunction(MAX always return the max value
> while LAST_VALUE always return the latest value). Also, you can find
> documents about Aggregate Functions here: https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/dev/table/udfs.
> html#aggregation-functions
>
> Best, Hequn
>
>
> 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fh...@gmail.com>:
>
> > Hi Ghassan,
> >
> > Flink's Table API / SQL does not support the upsert ingestion mode
> > (updating table rows by key) yet but only append mode, i.e, each event of
> > the data stream is appended to the table.
> > Hence, it is not possible to implement your use case using SQL.
> >
> > An upsert ingestion mode will be added in future version of Flink.
> >
> > Best, Fabian
> >
> > 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <Ghassan.Yammine@bazaarvoice.
> com
> > >:
> >
> > > Hi Timo,
> > >
> > > Thanks for your quick reply.
> > >
> > > I understand your statement about the SQL expression:  It is grouped by
> > > ProductID and performs the average computation over all the records it
> > > sees.  There is no concept of primary key and INSERT/UPDATE.
> > >
> > > Is there a way to do what I want using SQL without, or with, the
> > > pre-processing of the stream before a SQL expression is executed?
> > >
> > > One thought I had was to transform the stream - prior to the SQL
> > > expression - by doing something like morphing the value of the Rating
> to
> > be
> > > relative (instead of absolute) to the prior Rating, for the same
> review.
> > > However, this would entail some review tracking mechanism.
> > >
> > > As I mentioned before, I’ve implemented this using the Datastream API -
> > > internally maintaining a map of reviews and updating the average
> > > calculation as each record is processed.  Using SQL is very important
> to
> > > us, and I’d like to see if I can make it work.
> > >
> > > Regarding the output sink, I previously looked into that and concluded
> > > that I could not perform any “adjustment” to the average calculation
> > > because of what the current SQL expression emits (i.e. incorrect
> average
> > > calc.).  Here is what the (retract) stream looks like post-SQL:
> > >
> > > (true,(product-100,1.0))
> > > (false,(product-100,1.0))
> > > (true,(product-100,2.5))
> > > (false,(product-100,2.5))
> > > (true,(product-100,3.6666666666666665))
> > > (false,(product-100,3.6666666666666665))
> > > (true,(product-100,3.5)).   <———————— This is the correct value (per
> the
> > > SQL) but not what I want it to be.
> > >
> > > Even if I change what the SQL query returns/emits so that the TableSink
> > > can perform the average calculation, the latter will have to track
> prior
> > > reviews in order to update its average calculation.  If I’m correct,
> then
> > > this is essentially no different than the Datastream API implementation
> > > that I have.
> > >
> > > Agains, thanks for your quick response and help.
> > >
> > > Regards,
> > >
> > > Ghassan
> > >
> > >
> > >
> > > On Dec 20, 2017, at 11:50 AM, Timo Walther <twalthr@apache.org<mailto:
> > twa
> > > lthr@apache.org>> wrote:
> > >
> > > Hi Ghassan,
> > >
> > > in your example the result 3.5 is correct. The query is executed with
> > > standard SQL semantics. You only group by ProductID and since it is the
> > > same for all elements, the average is 3.5.
> > >
> > > The second "review-3" does not replace anything. In general, the
> > > replacement would happen in the TableSink. The dynamic table performs
> > view
> > > maintenance. The TableSink materializes the result to some key-value
> > store
> > > or database.
> > >
> > > It might be worth to look into TableSinks [0] and the JavaDocs of the
> > > mentioned classes.
> > >
> > > Feel free to ask further questions if necessary.
> > >
> > > Regards,
> > > Timo
> > >
> > > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> > dev/table/
> > > sourceSinks.html#define-a-tablesink
> > >
> > >
> > >
> > > Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
> > > Hello,
> > >
> > > I’m knew to Flink and I need some help.  I’d like to use the SQL API
> for
> > > processing an incoming stream that has the following characteristics:
> > >
> > >   *   Each stream record has a key
> > >   *   The record can be updated
> > >   *   The record is of the form: reviewId -> (productId, rating)
> > >
> > > For the above stream, I want to compute the average rating for each
> > > product ID.  The key is the reviewId
> > > With the SQL API, I get incorrect results.  However, I’ve been able to
> > > make it work through the use of RichFlatMapFunction and the Datastream
> > API.
> > >
> > > Below is the entire code listing, which does not work.  I know I’m
> > missing
> > > the definition/use of a primary key so that an update on the same key
> can
> > > occur.
> > > However, I’m not sure how to go about doing this.  Any help/comments
> are
> > > welcome.
> > >
> > > Thank you,
> > >
> > > Ghassan
> > >
> > >
> > > package com.bazaarvoice.flink_poc
> > >
> > > import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
> > > import org.apache.flink.api.common.time.Time
> > > import org.apache.flink.streaming.api.TimeCharacteristic
> > > import org.apache.flink.streaming.api.scala.{DataStream,
> > > createTypeInformation, _}
> > > import org.apache.flink.table.api.{StreamQueryConfig,
> TableEnvironment}
> > > import org.apache.flink.table.api.scala._
> > >
> > > package object flink_poc{
> > >   type ProductId = String
> > >   type ReviewId = String
> > > }
> > >
> > > case class SummaryReview(reviewId: ReviewId, productId: ProductId,
> > > approved: Boolean, rating: Double) extends Serializable {
> > >   override def toString: String = {
> > >     s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
> > > "REJECTED"},  $rating"
> > >   }
> > > }
> > >
> > > object AverageRatingWithSQL {
> > >
> > >   def main(args: Array[String]) {
> > >
> > >     val events = List(
> > >       SummaryReview("review-1", "product-100", approved = true, 1),
> > >       SummaryReview("review-2", "product-100", approved = true, 4),
> > >       SummaryReview("review-3", "product-100", approved = true, 6),
> > >       SummaryReview("review-3", "product-100", approved = true, 3)
> //
> > > <-- this should override the previous record
> > >     ).toSeq
> > >     // Average rating should be equal to (1+4+3)/3 = 2.666667
> > >
> > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > >     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
> > >
> > >     val inputStream: DataStream[SummaryReview] =
> > env.fromCollection(events)
> > >
> > >     val tableEnv = TableEnvironment.getTableEnvironment(env)
> > >
> > >     tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
> > > 'ProductID, 'Approved, 'Rating)
> > >
> > >     val resultTable = tableEnv.sql(
> > >       "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true
> > > GROUP BY ProductID"
> > >     )
> > >
> > >     val typeInfo = createTypeInformation[(ProductId, Double)]
> > >     val outStream = resultTable.toRetractStream(typeInfo)
> > >
> > >     outStream.print()
> > >
> > >     env.execute("Flink SQL Average rating")
> > >
> > >   }
> > > }
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> >
>

Re: Using SQL with dynamic tables where rows are updated

Posted by Hequn Cheng <ch...@gmail.com>.
hi Ghassan,

TableSource in Flink doesn't support primary key now, but you can achieve
it by doing a group by manually.

Such as:
val resultTable = tableEnv.sql(
      "
           SELECT ProductID, AVG(Rating) FROM
                 ( SELECT ReviewID, LAST_VALUE(ProductID),
LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY  ReviewID)
           WHERE Approved = true GROUP BY ProductID
        "
    )

You have to implement the LAST_VALUE AggregateFunction. For implementation,
you can refer to the MAX AggregateFunction(MAX always return the max value
while LAST_VALUE always return the latest value). Also, you can find
documents about Aggregate Functions here: https://ci.apache.org/
projects/flink/flink-docs-release-1.4/dev/table/udfs.
html#aggregation-functions

Best, Hequn


2018-01-05 18:31 GMT+08:00 Fabian Hueske <fh...@gmail.com>:

> Hi Ghassan,
>
> Flink's Table API / SQL does not support the upsert ingestion mode
> (updating table rows by key) yet but only append mode, i.e, each event of
> the data stream is appended to the table.
> Hence, it is not possible to implement your use case using SQL.
>
> An upsert ingestion mode will be added in future version of Flink.
>
> Best, Fabian
>
> 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <Ghassan.Yammine@bazaarvoice.com
> >:
>
> > Hi Timo,
> >
> > Thanks for your quick reply.
> >
> > I understand your statement about the SQL expression:  It is grouped by
> > ProductID and performs the average computation over all the records it
> > sees.  There is no concept of primary key and INSERT/UPDATE.
> >
> > Is there a way to do what I want using SQL without, or with, the
> > pre-processing of the stream before a SQL expression is executed?
> >
> > One thought I had was to transform the stream - prior to the SQL
> > expression - by doing something like morphing the value of the Rating to
> be
> > relative (instead of absolute) to the prior Rating, for the same review.
> > However, this would entail some review tracking mechanism.
> >
> > As I mentioned before, I’ve implemented this using the Datastream API -
> > internally maintaining a map of reviews and updating the average
> > calculation as each record is processed.  Using SQL is very important to
> > us, and I’d like to see if I can make it work.
> >
> > Regarding the output sink, I previously looked into that and concluded
> > that I could not perform any “adjustment” to the average calculation
> > because of what the current SQL expression emits (i.e. incorrect average
> > calc.).  Here is what the (retract) stream looks like post-SQL:
> >
> > (true,(product-100,1.0))
> > (false,(product-100,1.0))
> > (true,(product-100,2.5))
> > (false,(product-100,2.5))
> > (true,(product-100,3.6666666666666665))
> > (false,(product-100,3.6666666666666665))
> > (true,(product-100,3.5)).   <———————— This is the correct value (per the
> > SQL) but not what I want it to be.
> >
> > Even if I change what the SQL query returns/emits so that the TableSink
> > can perform the average calculation, the latter will have to track prior
> > reviews in order to update its average calculation.  If I’m correct, then
> > this is essentially no different than the Datastream API implementation
> > that I have.
> >
> > Agains, thanks for your quick response and help.
> >
> > Regards,
> >
> > Ghassan
> >
> >
> >
> > On Dec 20, 2017, at 11:50 AM, Timo Walther <twalthr@apache.org<mailto:
> twa
> > lthr@apache.org>> wrote:
> >
> > Hi Ghassan,
> >
> > in your example the result 3.5 is correct. The query is executed with
> > standard SQL semantics. You only group by ProductID and since it is the
> > same for all elements, the average is 3.5.
> >
> > The second "review-3" does not replace anything. In general, the
> > replacement would happen in the TableSink. The dynamic table performs
> view
> > maintenance. The TableSink materializes the result to some key-value
> store
> > or database.
> >
> > It might be worth to look into TableSinks [0] and the JavaDocs of the
> > mentioned classes.
> >
> > Feel free to ask further questions if necessary.
> >
> > Regards,
> > Timo
> >
> > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/
> > sourceSinks.html#define-a-tablesink
> >
> >
> >
> > Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
> > Hello,
> >
> > I’m knew to Flink and I need some help.  I’d like to use the SQL API for
> > processing an incoming stream that has the following characteristics:
> >
> >   *   Each stream record has a key
> >   *   The record can be updated
> >   *   The record is of the form: reviewId -> (productId, rating)
> >
> > For the above stream, I want to compute the average rating for each
> > product ID.  The key is the reviewId
> > With the SQL API, I get incorrect results.  However, I’ve been able to
> > make it work through the use of RichFlatMapFunction and the Datastream
> API.
> >
> > Below is the entire code listing, which does not work.  I know I’m
> missing
> > the definition/use of a primary key so that an update on the same key can
> > occur.
> > However, I’m not sure how to go about doing this.  Any help/comments are
> > welcome.
> >
> > Thank you,
> >
> > Ghassan
> >
> >
> > package com.bazaarvoice.flink_poc
> >
> > import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
> > import org.apache.flink.api.common.time.Time
> > import org.apache.flink.streaming.api.TimeCharacteristic
> > import org.apache.flink.streaming.api.scala.{DataStream,
> > createTypeInformation, _}
> > import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
> > import org.apache.flink.table.api.scala._
> >
> > package object flink_poc{
> >   type ProductId = String
> >   type ReviewId = String
> > }
> >
> > case class SummaryReview(reviewId: ReviewId, productId: ProductId,
> > approved: Boolean, rating: Double) extends Serializable {
> >   override def toString: String = {
> >     s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
> > "REJECTED"},  $rating"
> >   }
> > }
> >
> > object AverageRatingWithSQL {
> >
> >   def main(args: Array[String]) {
> >
> >     val events = List(
> >       SummaryReview("review-1", "product-100", approved = true, 1),
> >       SummaryReview("review-2", "product-100", approved = true, 4),
> >       SummaryReview("review-3", "product-100", approved = true, 6),
> >       SummaryReview("review-3", "product-100", approved = true, 3)    //
> > <-- this should override the previous record
> >     ).toSeq
> >     // Average rating should be equal to (1+4+3)/3 = 2.666667
> >
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> >     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
> >
> >     val inputStream: DataStream[SummaryReview] =
> env.fromCollection(events)
> >
> >     val tableEnv = TableEnvironment.getTableEnvironment(env)
> >
> >     tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
> > 'ProductID, 'Approved, 'Rating)
> >
> >     val resultTable = tableEnv.sql(
> >       "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true
> > GROUP BY ProductID"
> >     )
> >
> >     val typeInfo = createTypeInformation[(ProductId, Double)]
> >     val outStream = resultTable.toRetractStream(typeInfo)
> >
> >     outStream.print()
> >
> >     env.execute("Flink SQL Average rating")
> >
> >   }
> > }
> >
> >
> >
> >
> >
> >
> >
>