You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/02/08 09:30:00 UTC
[jira] [Comment Edited] (FLINK-8577) Implement proctime DataStream
to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356714#comment-16356714 ]
Fabian Hueske edited comment on FLINK-8577 at 2/8/18 9:29 AM:
--------------------------------------------------------------
I was thinking about how to handle deletion flags. I see three options:
# always require deletion flags. DataStreams that are append only, would need to add a map to add the flag.
# have special methods for upsert with and without deletion flags
# have a mandatory parameter that determines whether the input has flags or not.
So the choices are:
{code}
DataStream[(String, Long, Int)] input = ???
DataStream[(Boolean, (String, Long, Int))] flaggedInput = ???
// WITH DELETE FLAGS
// always require flags
table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key)
// mandatory parameter.
table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key)
// WITHOUT DELETE FLAGS
// always require flags, so add them manually
table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// mandatory parameter
table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key)
I think I'm preferring option 1. because it keeps the API slim and is consistent with the format of the UpsertTableSink.
{code}
What do you think [~hequn8128], [~twalthr]?
was (Author: fhueske):
I was thinking about how to handle deletion flags. I see three options:
# always require deletion flags. DataStreams that are append only, would need to add a map to add the flag.
# have special methods for upsert with and without deletion flags
# have a mandatory parameter that determines whether the input has flags or not.
So the choices are:
DataStream[(String, Long, Int)] input = ???
DataStream[(Boolean, (String, Long, Int))] flaggedInput = ???
// WITH DELETE FLAGS
// always require flags
table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key)
// mandatory parameter.
table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key)
// WITHOUT DELETE FLAGS
// always require flags, so add them manually
table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// mandatory parameter
table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key)
I think I'm preferring option 1. because it keeps the API slim and is consistent with the format of the UpsertTableSink.
What do you think [~hequn8128], [~twalthr]?
> Implement proctime DataStream to Table upsert conversion.
> ---------------------------------------------------------
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Hequn Cheng
> Assignee: Hequn Cheng
> Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)