You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shkob1 <sh...@gmail.com> on 2019/05/08 22:36:28 UTC

Reconstruct object through partial select query

Hey,

I'm trying to create a SQL query which, given input from a stream with
generic class T type will create a new stream which will be in the structure
of 
{
  origin : T
  resultOfSomeSQLCalc : Array[String]
}

it seems that just by doing "SELECT *" i can convert the resulting table
back to a stream of the origin object. 
Im trying to understand how do i create that envelope with the origin
object. Something along the lines of "SELECT box(*), array(..)". I'm sensing
i need a function for that, but not sure how that looks.

Thanks for the help!
Shahar




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Reconstruct object through partial select query

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

This looks like a good solution to me.
The conversion mappers in step 1. and 4. should not cause a lot of overhead
as they are chained to their predecessors.

Best, Fabian

Am Di., 14. Mai 2019 um 01:08 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com>:

> Hey Hequn & Fabian,
>
> It seems like i found a reasonable way using both Row and my own TypeInfo:
>
>    - I started by just using my own TypeInfo using your example. So i'm
>    using a serializer which is basically a compound of the original event type
>    serializer as well as a string array serializer
>    (used BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO.createSerializer for it).
>    the rest is pretty much boiler plate similar to the MapSerializer (as well
>    as the snapshot class). I extended TypeInformation<TaggedEvent<T>> and the
>    downside was that unlike Row/Pojo i didnt have the field names for the SQL
>    Query so i went back looking at Row.
>
>    - I realized that i can use Row without knowing the internals of the
>    origin event class in advance basically mapping to Row with an explicit
>    RowTypeInfo as follows:
>
>    final TypeInformation<Row> rowTypeInformation = Types.ROW(new String[]{"originalEvent", "tags"},
>            new TypeInformation[]{dataStream.getType(), BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO});
>
>
>    final SingleOutputStreamOperator<Row> mappedStream = this.dataStream.map((MapFunction<T, Row>) value -> {
>        final Row row = new Row(2);
>        row.setField(0, value);
>        row.setField(1, new String[0]);
>        return row;
>    }).returns(rowTypeInformation);
>
>    tableEnvironment.registerDataStream(ORIGIN_EVENT_STREAM_TABLE_NAME, mappedStream);
>
>
> So that worked well.
>
>
>    - Giving last thought to it i didnt want to leave my users with a Row
>    stream but indeed with a Pojo for it, so i ended up keeping the
>    TaggedEventTypeInfo as an additional mapping past the SQL query.
>    Therefore my process is:
>    1. Map DataStream<T> to Row, register as a table (code above)
>    2. Run SQL over the table
>    3. Transform result to an append stream of Row (trying to directly
>    convert to my new type info results with *Requested result type is an
>    atomic type but result[..] has more or less than a single field*. not
>    sure if there's a better way
>    4. Map the Row object to TaggedEvent<T> explicitly implementing
>    ResultTypeQueryable.getProducedType to use the TaggedEventTypeInfo
>
>
> Would love to get feedback on that, whether that seems like the best
> solution or can it be more efficient.
>
> Thanks!
> Shahar
>
>
> On Mon, May 13, 2019 at 9:25 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
>> Thanks for looking into it Hequn!
>>
>> I do not have a requirement to use TaggedEvent vs Row. But correct me if
>> I am wrong, creating a Row will require me knowing the internal fields of
>> the original event in compile time, is that correct? I do have a
>> requirement to support a generic original event type, so unless i can map T
>> to a Row without knowing the object fields, i wont be able to use it. Can
>> you confirm that?
>> I will look at MapView and let you know, thanks again!
>>
>> On Sun, May 12, 2019 at 1:30 AM Hequn Cheng <ch...@gmail.com> wrote:
>>
>>> Hi shahar,
>>>
>>> An easier way to solve your problem is to use a Row to store your data
>>> instead of the `TaggedEvent `. I think this is what Fabian means. In this
>>> way, you don't have to define the user-defined TypeFactory and use the Row
>>> type directly. Take `TaggedEvent<Car>` as an example, the corresponding row
>>> type is `Types.ROW(Types.ROW(Types.INT, Types.STRING),
>>> Types.OBJECT_ARRAY(Types.STRING))` in which Types is
>>> `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to
>>> cooperate with Table API & SQL.
>>>
>>> However, if the `TaggedEvent` is a must-have for you, you can take a
>>> look at the MapView[1] as an example of how to define a user-defined table
>>> factory.
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
>>>
>>> On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky <
>>> shahar.kobrinsky@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> I have a trouble implementing the type for this operation, i wonder how
>>>> i can do that.
>>>> So given generic type T i want to create a TypeInformation for:
>>>> class TaggedEvent<T> {
>>>>    String[] tags
>>>>    T originalEvent
>>>> }
>>>>
>>>> Was trying a few different things but not sure how to do it.
>>>> Doesn't seem like i can use TypeHint as i need to know the actual
>>>> generics class for it, right?
>>>> Do i need a TaggedEventTypeFactory? If so, how do i create the
>>>> TaggedEventTypeInfo for it?  do you have an example for it? was trying to
>>>> follow this[1] but doesn't seem to really work. I'm getting null as my
>>>> genericParameter for some reason. Also, how would you create the serializer
>>>> for the type info? can i reuse some builtin Kryo functionality?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
>>>> shahar.kobrinsky@gmail.com> wrote:
>>>>
>>>>> Thanks Fabian,
>>>>>
>>>>> I'm looking into a way to enrich it without having to know the
>>>>> internal fields of the original event type.
>>>>> Right now what I managed to do is to map Car into a TaggedEvent<Car>
>>>>> prior to the SQL query, tags being empty, then run the SQL query selecting *origin,
>>>>> enrich(.. ) as tags*
>>>>> Not sure there's a better way but i guess that works
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> you can use the value construction function ROW to create a nested
>>>>>> row (or object).
>>>>>> However, you have to explicitly reference all attributes that you
>>>>>> will add.
>>>>>>
>>>>>> If you have a table Cars with (year, modelName) a query could look
>>>>>> like this:
>>>>>>
>>>>>> SELECT
>>>>>>   ROW(year, modelName) AS car,
>>>>>>   enrich(year, modelName) AS tags
>>>>>> FROM Cars;
>>>>>>
>>>>>> Handling many attributes is always a bit painful in SQL.
>>>>>> There is an effort to make the Table API easier to use for these use
>>>>>> cases (for example Column Operations [1]).
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>>>>>> shahar.kobrinsky@gmail.com>:
>>>>>>
>>>>>>> Just to be more clear on my goal -
>>>>>>> Im trying to enrich the incoming stream with some meaningful tags
>>>>>>> based on
>>>>>>> conditions from the event itself.
>>>>>>> So the input stream could be an event looks like:
>>>>>>> Class Car {
>>>>>>>   int year;
>>>>>>>   String modelName;
>>>>>>> }
>>>>>>>
>>>>>>> i will have a config that are defining tags as:
>>>>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>>>>>
>>>>>>> So ideally my output will be in the structure of
>>>>>>>
>>>>>>> Class TaggedEvent<Car> {
>>>>>>>    Car origin;
>>>>>>>    String[] tags;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>
>>>>>>

Re: Reconstruct object through partial select query

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Hey Hequn & Fabian,

It seems like i found a reasonable way using both Row and my own TypeInfo:

   - I started by just using my own TypeInfo using your example. So i'm
   using a serializer which is basically a compound of the original event type
   serializer as well as a string array serializer
   (used BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO.createSerializer for it).
   the rest is pretty much boiler plate similar to the MapSerializer (as well
   as the snapshot class). I extended TypeInformation<TaggedEvent<T>> and the
   downside was that unlike Row/Pojo i didnt have the field names for the SQL
   Query so i went back looking at Row.

   - I realized that i can use Row without knowing the internals of the
   origin event class in advance basically mapping to Row with an explicit
   RowTypeInfo as follows:

   final TypeInformation<Row> rowTypeInformation = Types.ROW(new
String[]{"originalEvent", "tags"},
           new TypeInformation[]{dataStream.getType(),
BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO});


   final SingleOutputStreamOperator<Row> mappedStream =
this.dataStream.map((MapFunction<T, Row>) value -> {
       final Row row = new Row(2);
       row.setField(0, value);
       row.setField(1, new String[0]);
       return row;
   }).returns(rowTypeInformation);

   tableEnvironment.registerDataStream(ORIGIN_EVENT_STREAM_TABLE_NAME,
mappedStream);


So that worked well.


   - Giving last thought to it i didnt want to leave my users with a Row
   stream but indeed with a Pojo for it, so i ended up keeping the
   TaggedEventTypeInfo as an additional mapping past the SQL query.
   Therefore my process is:
   1. Map DataStream<T> to Row, register as a table (code above)
   2. Run SQL over the table
   3. Transform result to an append stream of Row (trying to directly
   convert to my new type info results with *Requested result type is an
   atomic type but result[..] has more or less than a single field*. not
   sure if there's a better way
   4. Map the Row object to TaggedEvent<T> explicitly implementing
   ResultTypeQueryable.getProducedType to use the TaggedEventTypeInfo


Would love to get feedback on that, whether that seems like the best
solution or can it be more efficient.

Thanks!
Shahar


On Mon, May 13, 2019 at 9:25 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Thanks for looking into it Hequn!
>
> I do not have a requirement to use TaggedEvent vs Row. But correct me if I
> am wrong, creating a Row will require me knowing the internal fields of the
> original event in compile time, is that correct? I do have a requirement to
> support a generic original event type, so unless i can map T to a Row
> without knowing the object fields, i wont be able to use it. Can you
> confirm that?
> I will look at MapView and let you know, thanks again!
>
> On Sun, May 12, 2019 at 1:30 AM Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi shahar,
>>
>> An easier way to solve your problem is to use a Row to store your data
>> instead of the `TaggedEvent `. I think this is what Fabian means. In this
>> way, you don't have to define the user-defined TypeFactory and use the Row
>> type directly. Take `TaggedEvent<Car>` as an example, the corresponding row
>> type is `Types.ROW(Types.ROW(Types.INT, Types.STRING),
>> Types.OBJECT_ARRAY(Types.STRING))` in which Types is
>> `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to
>> cooperate with Table API & SQL.
>>
>> However, if the `TaggedEvent` is a must-have for you, you can take a look
>> at the MapView[1] as an example of how to define a user-defined table
>> factory.
>>
>> Best, Hequn
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
>>
>> On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky <
>> shahar.kobrinsky@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> I have a trouble implementing the type for this operation, i wonder how
>>> i can do that.
>>> So given generic type T i want to create a TypeInformation for:
>>> class TaggedEvent<T> {
>>>    String[] tags
>>>    T originalEvent
>>> }
>>>
>>> Was trying a few different things but not sure how to do it.
>>> Doesn't seem like i can use TypeHint as i need to know the actual
>>> generics class for it, right?
>>> Do i need a TaggedEventTypeFactory? If so, how do i create the
>>> TaggedEventTypeInfo for it?  do you have an example for it? was trying to
>>> follow this[1] but doesn't seem to really work. I'm getting null as my
>>> genericParameter for some reason. Also, how would you create the serializer
>>> for the type info? can i reuse some builtin Kryo functionality?
>>>
>>> Thanks
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer
>>>
>>>
>>>
>>>
>>>
>>> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
>>> shahar.kobrinsky@gmail.com> wrote:
>>>
>>>> Thanks Fabian,
>>>>
>>>> I'm looking into a way to enrich it without having to know the internal
>>>> fields of the original event type.
>>>> Right now what I managed to do is to map Car into a TaggedEvent<Car>
>>>> prior to the SQL query, tags being empty, then run the SQL query selecting *origin,
>>>> enrich(.. ) as tags*
>>>> Not sure there's a better way but i guess that works
>>>>
>>>>
>>>>
>>>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> you can use the value construction function ROW to create a nested row
>>>>> (or object).
>>>>> However, you have to explicitly reference all attributes that you will
>>>>> add.
>>>>>
>>>>> If you have a table Cars with (year, modelName) a query could look
>>>>> like this:
>>>>>
>>>>> SELECT
>>>>>   ROW(year, modelName) AS car,
>>>>>   enrich(year, modelName) AS tags
>>>>> FROM Cars;
>>>>>
>>>>> Handling many attributes is always a bit painful in SQL.
>>>>> There is an effort to make the Table API easier to use for these use
>>>>> cases (for example Column Operations [1]).
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>>>>
>>>>>
>>>>>
>>>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>>>>> shahar.kobrinsky@gmail.com>:
>>>>>
>>>>>> Just to be more clear on my goal -
>>>>>> Im trying to enrich the incoming stream with some meaningful tags
>>>>>> based on
>>>>>> conditions from the event itself.
>>>>>> So the input stream could be an event looks like:
>>>>>> Class Car {
>>>>>>   int year;
>>>>>>   String modelName;
>>>>>> }
>>>>>>
>>>>>> i will have a config that are defining tags as:
>>>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>>>>
>>>>>> So ideally my output will be in the structure of
>>>>>>
>>>>>> Class TaggedEvent<Car> {
>>>>>>    Car origin;
>>>>>>    String[] tags;
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>

Re: Reconstruct object through partial select query

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Thanks for looking into it Hequn!

I do not have a requirement to use TaggedEvent vs Row. But correct me if I
am wrong, creating a Row will require me knowing the internal fields of the
original event in compile time, is that correct? I do have a requirement to
support a generic original event type, so unless i can map T to a Row
without knowing the object fields, i wont be able to use it. Can you
confirm that?
I will look at MapView and let you know, thanks again!

On Sun, May 12, 2019 at 1:30 AM Hequn Cheng <ch...@gmail.com> wrote:

> Hi shahar,
>
> An easier way to solve your problem is to use a Row to store your data
> instead of the `TaggedEvent `. I think this is what Fabian means. In this
> way, you don't have to define the user-defined TypeFactory and use the Row
> type directly. Take `TaggedEvent<Car>` as an example, the corresponding row
> type is `Types.ROW(Types.ROW(Types.INT, Types.STRING),
> Types.OBJECT_ARRAY(Types.STRING))` in which Types is
> `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to
> cooperate with Table API & SQL.
>
> However, if the `TaggedEvent` is a must-have for you, you can take a look
> at the MapView[1] as an example of how to define a user-defined table
> factory.
>
> Best, Hequn
>
> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
>
> On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
>> Hi Fabian,
>>
>> I have a trouble implementing the type for this operation, i wonder how i
>> can do that.
>> So given generic type T i want to create a TypeInformation for:
>> class TaggedEvent<T> {
>>    String[] tags
>>    T originalEvent
>> }
>>
>> Was trying a few different things but not sure how to do it.
>> Doesn't seem like i can use TypeHint as i need to know the actual
>> generics class for it, right?
>> Do i need a TaggedEventTypeFactory? If so, how do i create the
>> TaggedEventTypeInfo for it?  do you have an example for it? was trying to
>> follow this[1] but doesn't seem to really work. I'm getting null as my
>> genericParameter for some reason. Also, how would you create the serializer
>> for the type info? can i reuse some builtin Kryo functionality?
>>
>> Thanks
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer
>>
>>
>>
>>
>>
>> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
>> shahar.kobrinsky@gmail.com> wrote:
>>
>>> Thanks Fabian,
>>>
>>> I'm looking into a way to enrich it without having to know the internal
>>> fields of the original event type.
>>> Right now what I managed to do is to map Car into a TaggedEvent<Car>
>>> prior to the SQL query, tags being empty, then run the SQL query selecting *origin,
>>> enrich(.. ) as tags*
>>> Not sure there's a better way but i guess that works
>>>
>>>
>>>
>>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> you can use the value construction function ROW to create a nested row
>>>> (or object).
>>>> However, you have to explicitly reference all attributes that you will
>>>> add.
>>>>
>>>> If you have a table Cars with (year, modelName) a query could look like
>>>> this:
>>>>
>>>> SELECT
>>>>   ROW(year, modelName) AS car,
>>>>   enrich(year, modelName) AS tags
>>>> FROM Cars;
>>>>
>>>> Handling many attributes is always a bit painful in SQL.
>>>> There is an effort to make the Table API easier to use for these use
>>>> cases (for example Column Operations [1]).
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>>>
>>>>
>>>>
>>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>>>> shahar.kobrinsky@gmail.com>:
>>>>
>>>>> Just to be more clear on my goal -
>>>>> Im trying to enrich the incoming stream with some meaningful tags
>>>>> based on
>>>>> conditions from the event itself.
>>>>> So the input stream could be an event looks like:
>>>>> Class Car {
>>>>>   int year;
>>>>>   String modelName;
>>>>> }
>>>>>
>>>>> i will have a config that are defining tags as:
>>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>>>
>>>>> So ideally my output will be in the structure of
>>>>>
>>>>> Class TaggedEvent<Car> {
>>>>>    Car origin;
>>>>>    String[] tags;
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Re: Reconstruct object through partial select query

Posted by Hequn Cheng <ch...@gmail.com>.
Hi shahar,

An easier way to solve your problem is to use a Row to store your data
instead of the `TaggedEvent `. I think this is what Fabian means. In this
way, you don't have to define the user-defined TypeFactory and use the Row
type directly. Take `TaggedEvent<Car>` as an example, the corresponding row
type is `Types.ROW(Types.ROW(Types.INT, Types.STRING),
Types.OBJECT_ARRAY(Types.STRING))` in which Types is
`org.apache.flink.table.api.Types`. Furthermore, row type is also easier to
cooperate with Table API & SQL.

However, if the `TaggedEvent` is a must-have for you, you can take a look
at the MapView[1] as an example of how to define a user-defined table
factory.

Best, Hequn

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala

On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Hi Fabian,
>
> I have a trouble implementing the type for this operation, i wonder how i
> can do that.
> So given generic type T i want to create a TypeInformation for:
> class TaggedEvent<T> {
>    String[] tags
>    T originalEvent
> }
>
> Was trying a few different things but not sure how to do it.
> Doesn't seem like i can use TypeHint as i need to know the actual generics
> class for it, right?
> Do i need a TaggedEventTypeFactory? If so, how do i create the
> TaggedEventTypeInfo for it?  do you have an example for it? was trying to
> follow this[1] but doesn't seem to really work. I'm getting null as my
> genericParameter for some reason. Also, how would you create the serializer
> for the type info? can i reuse some builtin Kryo functionality?
>
> Thanks
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer
>
>
>
>
>
> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>> I'm looking into a way to enrich it without having to know the internal
>> fields of the original event type.
>> Right now what I managed to do is to map Car into a TaggedEvent<Car>
>> prior to the SQL query, tags being empty, then run the SQL query selecting *origin,
>> enrich(.. ) as tags*
>> Not sure there's a better way but i guess that works
>>
>>
>>
>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> you can use the value construction function ROW to create a nested row
>>> (or object).
>>> However, you have to explicitly reference all attributes that you will
>>> add.
>>>
>>> If you have a table Cars with (year, modelName) a query could look like
>>> this:
>>>
>>> SELECT
>>>   ROW(year, modelName) AS car,
>>>   enrich(year, modelName) AS tags
>>> FROM Cars;
>>>
>>> Handling many attributes is always a bit painful in SQL.
>>> There is an effort to make the Table API easier to use for these use
>>> cases (for example Column Operations [1]).
>>>
>>> Best, Fabian
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>>
>>>
>>>
>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>>> shahar.kobrinsky@gmail.com>:
>>>
>>>> Just to be more clear on my goal -
>>>> Im trying to enrich the incoming stream with some meaningful tags based
>>>> on
>>>> conditions from the event itself.
>>>> So the input stream could be an event looks like:
>>>> Class Car {
>>>>   int year;
>>>>   String modelName;
>>>> }
>>>>
>>>> i will have a config that are defining tags as:
>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>>
>>>> So ideally my output will be in the structure of
>>>>
>>>> Class TaggedEvent<Car> {
>>>>    Car origin;
>>>>    String[] tags;
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Re: Reconstruct object through partial select query

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Hi Fabian,

I have a trouble implementing the type for this operation, i wonder how i
can do that.
So given generic type T i want to create a TypeInformation for:
class TaggedEvent<T> {
   String[] tags
   T originalEvent
}

Was trying a few different things but not sure how to do it.
Doesn't seem like i can use TypeHint as i need to know the actual generics
class for it, right?
Do i need a TaggedEventTypeFactory? If so, how do i create the
TaggedEventTypeInfo for it?  do you have an example for it? was trying to
follow this[1] but doesn't seem to really work. I'm getting null as my
genericParameter for some reason. Also, how would you create the serializer
for the type info? can i reuse some builtin Kryo functionality?

Thanks


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer





On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Thanks Fabian,
>
> I'm looking into a way to enrich it without having to know the internal
> fields of the original event type.
> Right now what I managed to do is to map Car into a TaggedEvent<Car> prior
> to the SQL query, tags being empty, then run the SQL query selecting *origin,
> enrich(.. ) as tags*
> Not sure there's a better way but i guess that works
>
>
>
> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> you can use the value construction function ROW to create a nested row
>> (or object).
>> However, you have to explicitly reference all attributes that you will
>> add.
>>
>> If you have a table Cars with (year, modelName) a query could look like
>> this:
>>
>> SELECT
>>   ROW(year, modelName) AS car,
>>   enrich(year, modelName) AS tags
>> FROM Cars;
>>
>> Handling many attributes is always a bit painful in SQL.
>> There is an effort to make the Table API easier to use for these use
>> cases (for example Column Operations [1]).
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>
>>
>>
>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>> shahar.kobrinsky@gmail.com>:
>>
>>> Just to be more clear on my goal -
>>> Im trying to enrich the incoming stream with some meaningful tags based
>>> on
>>> conditions from the event itself.
>>> So the input stream could be an event looks like:
>>> Class Car {
>>>   int year;
>>>   String modelName;
>>> }
>>>
>>> i will have a config that are defining tags as:
>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>
>>> So ideally my output will be in the structure of
>>>
>>> Class TaggedEvent<Car> {
>>>    Car origin;
>>>    String[] tags;
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: Reconstruct object through partial select query

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Thanks Fabian,

I'm looking into a way to enrich it without having to know the internal
fields of the original event type.
Right now what I managed to do is to map Car into a TaggedEvent<Car> prior
to the SQL query, tags being empty, then run the SQL query selecting *origin,
enrich(.. ) as tags*
Not sure there's a better way but i guess that works



On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> you can use the value construction function ROW to create a nested row (or
> object).
> However, you have to explicitly reference all attributes that you will add.
>
> If you have a table Cars with (year, modelName) a query could look like
> this:
>
> SELECT
>   ROW(year, modelName) AS car,
>   enrich(year, modelName) AS tags
> FROM Cars;
>
> Handling many attributes is always a bit painful in SQL.
> There is an effort to make the Table API easier to use for these use cases
> (for example Column Operations [1]).
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-11967
>
>
>
> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
> shahar.kobrinsky@gmail.com>:
>
>> Just to be more clear on my goal -
>> Im trying to enrich the incoming stream with some meaningful tags based on
>> conditions from the event itself.
>> So the input stream could be an event looks like:
>> Class Car {
>>   int year;
>>   String modelName;
>> }
>>
>> i will have a config that are defining tags as:
>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>
>> So ideally my output will be in the structure of
>>
>> Class TaggedEvent<Car> {
>>    Car origin;
>>    String[] tags;
>> }
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Reconstruct object through partial select query

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

you can use the value construction function ROW to create a nested row (or
object).
However, you have to explicitly reference all attributes that you will add.

If you have a table Cars with (year, modelName) a query could look like
this:

SELECT
  ROW(year, modelName) AS car,
  enrich(year, modelName) AS tags
FROM Cars;

Handling many attributes is always a bit painful in SQL.
There is an effort to make the Table API easier to use for these use cases
(for example Column Operations [1]).

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-11967



Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <shahar.kobrinsky@gmail.com
>:

> Just to be more clear on my goal -
> Im trying to enrich the incoming stream with some meaningful tags based on
> conditions from the event itself.
> So the input stream could be an event looks like:
> Class Car {
>   int year;
>   String modelName;
> }
>
> i will have a config that are defining tags as:
> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>
> So ideally my output will be in the structure of
>
> Class TaggedEvent<Car> {
>    Car origin;
>    String[] tags;
> }
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Reconstruct object through partial select query

Posted by shkob1 <sh...@gmail.com>.
Just to be more clear on my goal -
Im trying to enrich the incoming stream with some meaningful tags based on
conditions from the event itself.
So the input stream could be an event looks like:
Class Car {
  int year;
  String modelName;
} 

i will have a config that are defining tags as:
"NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"

So ideally my output will be in the structure of

Class TaggedEvent<Car> {
   Car origin;
   String[] tags;
}






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/