You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "zhengcanbin (JIRA)" <ji...@apache.org> on 2018/05/29 04:00:00 UTC
[jira] [Updated] (FLINK-9460) Redundant output in table & upsert
semantics
[ https://issues.apache.org/jira/browse/FLINK-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhengcanbin updated FLINK-9460:
-------------------------------
Description:
The output seems incorrect in my table & upsert example, here's the code:
{code:java}
object VerifyUpsert {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
val input = env.socketTextStream("localhost", 9099).map { x =>
val tokens = x.split(",")
DemoSource(tokens(0), tokens(1), tokens(2))
}
tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 'page_id)
val fieldNames = Array("record_time", "pv", "uv")
val fieldTypes = Array(Types.STRING, Types.LONG, Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, MyPrintSink(fieldNames, fieldTypes))
tEnv.sqlUpdate(
"""
|INSERT INTO demoSink
|SELECT
| SUBSTRING(record_time, 1, 16) as record_time,
| count(user_id) as pv,
| count(DISTINCT user_id) as uv
|FROM demoSource
|GROUP BY SUBSTRING(record_time, 1, 16)
""".stripMargin)
env.execute()
}
case class DemoSource(record_time: String, user_id: String, page_id: String)
}
case class MyPrintSink(var fNames: Array[String], var fTypes: Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {
override def setKeyFields(keys: Array[String]): Unit = Seq.empty
override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}
override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new PrintSinkFunction())
override def getFieldNames: Array[String] = fNames
override def getFieldTypes: Array[TypeInformation[_]] = fTypes
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
val copy = MyPrintSink(fNames, fTypes)
copy.fNames = fieldNames
copy.fTypes = fieldTypes
copy
}
}{code}
when application starts, I type in netcat client one record a time, below table shows outputs for every input record:
||input||output||
|2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
|2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
|2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
|2018-05-24 21:34:12,0,4|{color:#ff0000}(true,2018-05-24 21:34,2,2){color}
(true,2018-05-24 21:34,4,3)|
when the forth record is consumed, two output records would be printed in sink, obviously the first one record with red color is redundant. I followed the source code and found something wrong with
{code:java}
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement
{code}
!image-2018-05-29-11-51-20-671.png!
I think when (!generateRetraction) && !inputC.change is true, we should not invoke out.collect here.
[~StephanEwen] pls look over this
was:
The output seems incorrect in my table & upsert example, here's the code:
{code:java}
object VerifyUpsert {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
val input = env.socketTextStream("localhost", 9099).map { x =>
val tokens = x.split(",")
DemoSource(tokens(0), tokens(1), tokens(2))
}
tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 'page_id)
val fieldNames = Array("record_time", "pv", "uv")
val fieldTypes = Array(Types.STRING, Types.LONG, Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, MyPrintSink(fieldNames, fieldTypes))
tEnv.sqlUpdate(
"""
|INSERT INTO demoSink
|SELECT
| SUBSTRING(record_time, 1, 16) as record_time,
| count(user_id) as pv,
| count(DISTINCT user_id) as uv
|FROM demoSource
|GROUP BY SUBSTRING(record_time, 1, 16)
""".stripMargin)
env.execute()
}
case class DemoSource(record_time: String, user_id: String, page_id: String)
}
case class MyPrintSink(var fNames: Array[String], var fTypes: Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {
override def setKeyFields(keys: Array[String]): Unit = Seq.empty
override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}
override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new PrintSinkFunction())
override def getFieldNames: Array[String] = fNames
override def getFieldTypes: Array[TypeInformation[_]] = fTypes
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
val copy = MyPrintSink(fNames, fTypes)
copy.fNames = fieldNames
copy.fTypes = fieldTypes
copy
}
}{code}
when application starts, I type in netcat client one record a time, below table shows outputs for every input record:
||input||output||
|2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
|2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
|2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
|2018-05-24 21:34:12,0,4|{color:#FF0000}(true,2018-05-24 21:34,2,2){color}
(true,2018-05-24 21:34,4,3)|
when the forth record is consumed, two output records would be printed in sink, obviously the first one record with red color is redundant. I followed the source code and found something wrong with
{code:java}
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement
{code}
!image-2018-05-29-11-51-20-671.png!
I think when (!generateRetraction) && !inputC.change is true, we should not invoke out.collect here.
[~astephan] please look over this
> Redundant output in table & upsert semantics
> --------------------------------------------
>
> Key: FLINK-9460
> URL: https://issues.apache.org/jira/browse/FLINK-9460
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.5.0
> Reporter: zhengcanbin
> Priority: Major
> Labels: patch
> Fix For: 1.6.0
>
> Attachments: image-2018-05-29-11-39-45-698.png, image-2018-05-29-11-51-20-671.png
>
>
> The output seems incorrect in my table & upsert example, here's the code:
> {code:java}
> object VerifyUpsert {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.createLocalEnvironment()
> val tEnv = TableEnvironment.getTableEnvironment(env)
> env.setParallelism(1)
> val input = env.socketTextStream("localhost", 9099).map { x =>
> val tokens = x.split(",")
> DemoSource(tokens(0), tokens(1), tokens(2))
> }
> tEnv.registerDataStream("demoSource", input, 'record_time, 'user_id, 'page_id)
> val fieldNames = Array("record_time", "pv", "uv")
> val fieldTypes = Array(Types.STRING, Types.LONG, Types.LONG).asInstanceOf[Array[TypeInformation[_]]]
> tEnv.registerTableSink("demoSink", fieldNames, fieldTypes, MyPrintSink(fieldNames, fieldTypes))
> tEnv.sqlUpdate(
> """
> |INSERT INTO demoSink
> |SELECT
> | SUBSTRING(record_time, 1, 16) as record_time,
> | count(user_id) as pv,
> | count(DISTINCT user_id) as uv
> |FROM demoSource
> |GROUP BY SUBSTRING(record_time, 1, 16)
> """.stripMargin)
> env.execute()
> }
> case class DemoSource(record_time: String, user_id: String, page_id: String)
> }
> case class MyPrintSink(var fNames: Array[String], var fTypes: Array[TypeInformation[_]]) extends UpsertStreamTableSink[Row] {
> override def setKeyFields(keys: Array[String]): Unit = Seq.empty
> override def setIsAppendOnly(isAppendOnly: lang.Boolean): Unit = {}
> override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
> override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = dataStream.addSink(new PrintSinkFunction())
> override def getFieldNames: Array[String] = fNames
> override def getFieldTypes: Array[TypeInformation[_]] = fTypes
> override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = {
> val copy = MyPrintSink(fNames, fTypes)
> copy.fNames = fieldNames
> copy.fTypes = fieldTypes
> copy
> }
> }{code}
> when application starts, I type in netcat client one record a time, below table shows outputs for every input record:
>
> ||input||output||
> |2018-05-24 21:34:02,8,9|(true,2018-05-24 21:34,1,1)|
> |2018-05-24 21:34:12,6,6|(true,2018-05-24 21:34,2,2)|
> |2018-05-24 21:34:12,0,9|(true,2018-05-24 21:34,3,3)|
> |2018-05-24 21:34:12,0,4|{color:#ff0000}(true,2018-05-24 21:34,2,2){color}
> (true,2018-05-24 21:34,4,3)|
>
> when the forth record is consumed, two output records would be printed in sink, obviously the first one record with red color is redundant. I followed the source code and found something wrong with
>
> {code:java}
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction#processElement
> {code}
> !image-2018-05-29-11-51-20-671.png!
> I think when (!generateRetraction) && !inputC.change is true, we should not invoke out.collect here.
>
> [~StephanEwen] pls look over this
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)