You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/05/29 10:29:00 UTC

[jira] [Commented] (FLINK-9460) Redundant output in table & upsert semantics

    [ https://issues.apache.org/jira/browse/FLINK-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493345#comment-16493345 ] 

Fabian Hueske commented on FLINK-9460:
--------------------------------------

First of all, the result is correct. 
The semantics of the {{UpsertTableSink}} is that it updates the previously emitted results based on a key (in this case {{SUBSTRING(record_time, 1, 16) as record_time}}).

The query is optimized into two group-by aggregations ({{GROUP BY SUBSTRING(record_time, 1, 16), user_id}} for the {{DISTINCT}} and {{GROUP BY SUBSTRING(record_time, 1, 16)}} for the final aggregation. The first aggregation operator emits its updates as retraction messages to the second one, i.e., an update is treated as a delete and and insert message. Each message is handled individually. This should cause the two update messages. 

I don't think that we can prevent cases of two update messages in general, however the case for {{DISTINCT}} {{GROUP BY}} aggregates can be improved if we leverage the recent support for distinct aggregates for windows also for non-windowed aggregates.

I thing the suggestion to not emit a message if {{!generateRetraction && !inputC.change == true}} is not correct, because we would not remove results if input records are retracted.


> Redundant output in table & upsert semantics
> --------------------------------------------
>
>                 Key: FLINK-9460
>                 URL: https://issues.apache.org/jira/browse/FLINK-9460
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API &amp; SQL
>    Affects Versions: 1.5.0
>            Reporter: zhengcanbin
>            Priority: Major
>              Labels: patch
>             Fix For: 1.6.0
>
>         Attachments: image-2018-05-29-11-39-45-698.png, image-2018-05-29-11-51-20-671.png
>
>
> The output seems incorrect in my table & upsert example, here's the code:
> {code:java}
> object VerifyUpsert {
>   def main(args: Array[String]): Unit = {
>     val env = StreamExecutionEnvironment.createLocalEnvironment()
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     env.setParallelism(1)
>     val input = env.socketTextStream("localhost", 9099).map { x =>
>       val tokens = x.split(",")
>       DemoSource(tokens(0), tokens(1), tokens(2))
>     }
>     tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 'page_id)
>     val fieldNames = Array("record_time", "pv", "uv")
>     val fieldTypes = Array(Types.STRING, Types.LONG, Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
>     tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, MyPrintSink(fieldNames, fieldTypes))
>     tEnv.sqlUpdate(
>       """
>         |INSERT INTO demoSink
>         |SELECT
>         |  SUBSTRING(record_time, 1, 16) as record_time,
>         |  count(user_id) as pv,
>         |  count(DISTINCT user_id) as uv
>         |FROM demoSource
>         |GROUP BY SUBSTRING(record_time, 1, 16)
>       """.stripMargin)
>     env.execute()
>   }
>   case class DemoSource(record_time: String, user_id: String, page_id: String)
> }
> case class MyPrintSink(var fNames: Array[String], var fTypes: Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {
>   override def setKeyFields(keys: Array[String]): Unit = Seq.empty
>   override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}
>   override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
>   override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new PrintSinkFunction())
>   override def getFieldNames: Array[String] = fNames
>   override def getFieldTypes: Array[TypeInformation[_]] = fTypes
>   override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
>     val copy = MyPrintSink(fNames, fTypes)
>     copy.fNames = fieldNames
>     copy.fTypes = fieldTypes
>     copy
>   }
> }{code}
> when application starts, I type in netcat client one record a time,  below table shows outputs for every input record:
>  
> ||input||output||
> |2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
> |2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
> |2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
> |2018-05-24 21:34:12,0,4|{color:#ff0000}(true,2018-05-24 21:34,2,2){color}
>  (true,2018-05-24 21:34,4,3)|
>  
> when the forth record is consumed, two output records would be printed in sink, obviously the first one record with red color is redundant. I followed the source code and found something wrong with 
>  
> {code:java}
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement
> {code}
> !image-2018-05-29-11-51-20-671.png!
> I think when (!generateRetraction) && !inputC.change is true, we should not invoke out.collect here.
>  
> [~StephanEwen] pls look over this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)