You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/02/08 09:30:00 UTC

[jira] [Comment Edited] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

    [ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356714#comment-16356714 ] 

Fabian Hueske edited comment on FLINK-8577 at 2/8/18 9:29 AM:
--------------------------------------------------------------

I was thinking about how to handle deletion flags. I see three options:
 # always require deletion flags. DataStreams that are append only, would need to add a map to add the flag.
 # have special methods for upsert with and without deletion flags
 # have a mandatory parameter that determines whether the input has flags or not.

So the choices are:

{code}
 DataStream[(String, Long, Int)] input = ???
 DataStream[(Boolean, (String, Long, Int))] flaggedInput = ???

// WITH DELETE FLAGS

// always require flags
 table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
 // special method
 table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key)
 // mandatory parameter. 
 table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key)

// WITHOUT DELETE FLAGS

// always require flags, so add them manually
 table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
 // special method
 table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
 // mandatory parameter
 table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key)
 I think I'm preferring option 1. because it keeps the API slim and is consistent with the format of the UpsertTableSink.
{code}

What do you think [~hequn8128], [~twalthr]?


was (Author: fhueske):
I was thinking about how to handle deletion flags. I see three options:
 # always require deletion flags. DataStreams that are append only, would need to add a map to add the flag.
 # have special methods for upsert with and without deletion flags
 # have a mandatory parameter that determines whether the input has flags or not.

So the choices are:
DataStream[(String, Long, Int)] input = ???
DataStream[(Boolean, (String, Long, Int))] flaggedInput = ???

// WITH DELETE FLAGS

// always require flags
table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key)
// mandatory parameter. 
table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key)

// WITHOUT DELETE FLAGS

// always require flags, so add them manually
table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// mandatory parameter
table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key)
I think I'm preferring option 1. because it keeps the API slim and is consistent with the format of the UpsertTableSink.

What do you think [~hequn8128], [~twalthr]?

> Implement proctime DataStream to Table upsert conversion.
> ---------------------------------------------------------
>
>                 Key: FLINK-8577
>                 URL: https://issues.apache.org/jira/browse/FLINK-8577
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)