You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shkob1 <sh...@gmail.com> on 2019/03/06 20:52:41 UTC

Schema Evolution on Dynamic Schema

Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of 

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Schema Evolution on Dynamic Schema

Posted by shkob1 <sh...@gmail.com>.
Sorry to flood this thread, but keeping my experiments:

so far i've been using retract to a Row and then mapping to a dynamic pojo
that is created (using ByteBuddy) according to the select fields in the SQL.
Considering the error I'm trying now to remove thr usage in Row and use the
dynamic type directly when converting query result table to a retracted
stream.

However, since i dont have the compile time knowledge of the select field
types (the created pojo has them as "Object") i don't think i can create a
Kryo serializer for them (i guess by using Row i deferred my problem from
compile time to schema evolution time by using row -> dynamic object
conversion).

So i guess i need to find a solution in a way that i can either:
- figure out how to infer the type of a SQL select field based on the source
table somehow
- OR figure out how to create a (Kryo?) serializer that can convert to the
dynamic object in a similar way to the RowSerializer (supporting Object
fields). 

Would love to hear more thoughts







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Schema Evolution on Dynamic Schema

Posted by shkob1 <sh...@gmail.com>.
Debugging locally it seems like the state descriptor of "GroupAggregateState"
is creating an additional field (TypleSerializer of SumAccumulator)
serializer within the RowSerializer. Im guessing this is what causing
incompatibility? Is there any work around i can do?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Schema Evolution on Dynamic Schema

Posted by shkob1 <sh...@gmail.com>.
Hi Fabian,

It seems like it didn't work.
Let me specify what i have done:

i have a SQL that looks something like:
Select a, sum(b), map[ 'sum_c', sum(c), 'sum_d', sum(d)] as my_map FROM... 
GROUP BY a

As you said im preventing keys in the state forever by doing idle state
retention time (+ im transforming to retracted stream along with a custom
trigger that sends the data to the sink).

I tried adding a new item to the map ( say 'sum_e', sum(e) ), cancelled with
savepoint and rerun from that savepoint and got the same error as above
about state incompatibility. 

Why do you think would that happen? 

Thanks
Shahar






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Schema Evolution on Dynamic Schema

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Shahar,

Thanks!

The approach of the UDAGG would be very manual. You could not reuse the
built-in functions.
There are several ways to achieve this. One approach could be to have a
map-based UDAGG for each type of aggregation that you'd like to support
(SUM, COUNT, ...)
Let's say we have a sumMap function, it could have a MAP(String, Double) as
input parameter and produce a MAP(String, Double) as result. Internally,
the function would create and maintain a sum aggregate for each unique
String key of the map.
The same could be done for countMap, minMap, etc.
Since the accumulator of the UDAGGs would be a map, it should be state
compatible and support a growing number of aggregates. I would not be
easily possible (without injecting marker records) to delete aggregates.

I don't think this would be very efficient, but should work.

Best, Fabian

Am Di., 9. Apr. 2019 um 01:35 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com>:

> That makes sense Fabian!
> So I want to make sure I fully understand how this should look.
> Would the same expression look like:
>
> custom_groupby(my_group_fields, map[ 'a', sum(a)...])
> ?
> Will I be able to use the builtin aggregation function internally such as
> sum/avg etc? or would I need to reimplement all such functions?
> In terms of schema evolution, if these are implemented as a map state,
> will I be OK as new items are added to that map?
>
> Thanks again, and congrats on an awesome conference, I had learned a lot
> Shahar
>
> From: Fabian Hueske
> Sent: Monday, April 8, 02:54
> Subject: Re: Schema Evolution on Dynamic Schema
> To: Shahar Cizer Kobrinsky
> Cc: Rong Rong, user
>
>
> Hi Shahar,
>
> Sorry for the late response.
>
> The problem is not with the type of the retract stream, but with the GROUP
> BY aggregation operator.
> The optimizer is converting the plan into an aggregation operator that
> computes all aggregates followed by a projection that inserts the
> aggregation results into a MAP type.
> The problem is the state of the aggregation operator. By adding a new
> field to the map, the state of the operator changes and you cannot restore
> it.
> The only workaround that I can think of would be to implement a
> user-defined aggregation function [1] that performs all aggregations
> internally and manually maintain state compatibility for the accumulator of
> the UDAGG.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions
>
> Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com>:
>
> Hmm kinda stuck here. Seems like SQL Group by is translated to a
> *GroupAggProcessFunction* which stores a state for every aggregation
> element (thus flattening the map items for state store). Seems like there's
> no way around it. Am i wrong? is there any way to evolve the map elements
> when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  *?
>
> On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi,
>
> I think this would work.
> However, you should be aware that all keys are kept forever in state
> (unless you configure idle state retention time [1]).
> This includes previous versions of keys.
>
> Also note that we are not guaranteeing savepoint compatibility across
> Flink versions yet.
> If the state of the aggregation operator changes in a later version (say
> Flink 1.9.x), it might not be possible to migrate to a later Flink version.
> Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com>:
>
> My bad. it actually did work with
> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
> group by a
>
> do you think thats OK as a workaround? main schema should be changed that
> way - only keys in the map
>
> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
> Thanks Fabian,
>
> Im thinking about how to work around that issue and one thing that came to
> my mind is to create a map that holds keys & values that can be edited
> without changing the schema, though im thinking how to implement it in
> Calcite.
> Considering the following original SQL in which "metrics" can be
> added/deleted/renamed
> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
> Group by a
>
> im looking both at json_objectagg & map to change it but it seems that
> json_objectagg is on a later calcite version and map doesnt work for me.
> Trying something like
> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
> group by a
>
> results with "Non-query expression encountered in illegal context"
> is my train of thought the right one? if so, do i have a mistake in the
> way im trying to implement it?
>
> Thanks!
>
>
>
>
>
>
>
> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi,
>
> Restarting a changed query from a savepoint is currently not supported.
> In general this is a very difficult problem as new queries might result in
> completely different execution plans.
> The special case of adding and removing aggregates is easier to solve, but
> the schema of the stored state changes and we would need to analyze the
> previous and current query and generate compatible serializers.
> So far we did not explore this rabbit hole.
>
> Also, starting a different query from a savepoint can also lead to weird
> result semantics.
> I'd recommend to bootstrap the state of the new query from scatch.
>
> Best, Fabian
>
>
>
> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com>:
>
> Or is it the SQL state that is incompatible.. ?
>
> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
> Thanks Guys,
>
> I actually got an error now adding some fields into the select statement:
>
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
> at
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: For heap
> backends, the new state serializer must not be incompatible.
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 9 more
>
> Does that mean i should move from having a Pojo storing the result of the
> SQL retracted stream to Avro? trying to understand how to mitigate it.
>
> Thanks
>
> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:
>
> Hi Shahar,
>
> From my understanding, if you use "groupby" withAggregateFunctions, they
> save the accumulators to SQL internal states: which are invariant from your
> input schema. Based on what you described I think that's why it is fine for
> recovering from existing state.
> I think one confusion you might have is the "toRetractStream" syntax. This
> actually passes the "retracting" flag to the Flink planner to indicate how
> the DataStream operator gets generated based on your SQL.
>
> So in my understanding, there's really no "state" associated with the
> "retracting stream", but rather associated with the generated operators.
> However, I am not expert in Table/SQL state recovery: I recall there were
> an open JIRA[1] that might be related to your question regarding SQL/Table
> generated operator recovery. Maybe @Fabian can provide more insight here?
>
> Regarding the rest of the pipeline, both "filter" and "map" operators are
> stateless; and sink state recovery depends on what you do.
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-6966
>
> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com> wrote:
>
> Thanks Rong,
>
> I have made some quick test changing the SQL select (adding a select field
> in the middle) and reran the job from a savepoint and it worked without any
> errors. I want to make sure i understand how at what point the state is
> stored and how does it work.
>
> Let's simplify the scenario and forget my specific case of dynamically
> generated pojo. let's focus on generic steps of:
> Source->register table->SQL select and group by session->retracted stream
> (Row)->transformToPojo (Custom Map function) ->pushToSink
>
> And let's assume the SQL select is changed (a field is added somewhere in
> the middle of the select field).
> So:
> We had intermediate results that are in the old format that are loaded from
> state to the new Row object in the retracted stream. is that an accurate
> statement? at what operator/format is the state stored in this case? is it
> the SQL result/Row? is it the Pojo? as this scenario does not fail for me
> im
> trying to understand how/where it is handled in Flink?
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>

Re: Schema Evolution on Dynamic Schema

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
That makes sense Fabian!
So I want to make sure I fully understand how this should look.
Would the same expression look like:

custom_groupby(my_group_fields, map[ 'a', sum(a)...])
?
Will I be able to use the builtin aggregation function internally such as sum/avg etc? or would I need to reimplement all such functions?
In terms of schema evolution, if these are implemented as a map state, will I be OK as new items are added to that map?

Thanks again, and congrats on an awesome conference, I had learned a lot
Shahar

From: Fabian Hueske
Sent: Monday, April 8, 02:54
Subject: Re: Schema Evolution on Dynamic Schema
To: Shahar Cizer Kobrinsky
Cc: Rong Rong, user


Hi Shahar,

Sorry for the late response.

The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator.
The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type.
The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it.
The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions

Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <sh...@gmail.com>>:
Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  ?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fh...@gmail.com>> wrote:
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink versions yet.
If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time

Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <sh...@gmail.com>>:
My bad. it actually did work with
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <sh...@gmail.com>> wrote:
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite.
Considering the following original SQL in which "metrics" can be added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fh...@gmail.com>> wrote:
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <sh...@gmail.com>>:
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <sh...@gmail.com>> wrote:
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com>> wrote:
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state.
I think one confusion you might have is the "toRetractStream" syntax. This actually passes the "retracting" flag to the Flink planner to indicate how the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the "retracting stream", but rather associated with the generated operators.
However, I am not expert in Table/SQL state recovery: I recall there were an open JIRA[1] that might be related to your question regarding SQL/Table generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are stateless; and sink state recovery depends on what you do.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-6966

On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>> wrote:
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of:
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Schema Evolution on Dynamic Schema

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Shahar,

Sorry for the late response.

The problem is not with the type of the retract stream, but with the GROUP
BY aggregation operator.
The optimizer is converting the plan into an aggregation operator that
computes all aggregates followed by a projection that inserts the
aggregation results into a MAP type.
The problem is the state of the aggregation operator. By adding a new field
to the map, the state of the operator changes and you cannot restore it.
The only workaround that I can think of would be to implement a
user-defined aggregation function [1] that performs all aggregations
internally and manually maintain state compatibility for the accumulator of
the UDAGG.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions

Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com>:

> Hmm kinda stuck here. Seems like SQL Group by is translated to a
> *GroupAggProcessFunction* which stores a state for every aggregation
> element (thus flattening the map items for state store). Seems like there's
> no way around it. Am i wrong? is there any way to evolve the map elements
> when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  *?
>
> On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> I think this would work.
>> However, you should be aware that all keys are kept forever in state
>> (unless you configure idle state retention time [1]).
>> This includes previous versions of keys.
>>
>> Also note that we are not guaranteeing savepoint compatibility across
>> Flink versions yet.
>> If the state of the aggregation operator changes in a later version (say
>> Flink 1.9.x), it might not be possible to migrate to a later Flink version.
>> Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>> Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
>> shahar.kobrinsky@gmail.com>:
>>
>>> My bad. it actually did work with
>>> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
>>> group by a
>>>
>>> do you think thats OK as a workaround? main schema should be changed
>>> that way - only keys in the map
>>>
>>> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
>>> shahar.kobrinsky@gmail.com> wrote:
>>>
>>>> Thanks Fabian,
>>>>
>>>> Im thinking about how to work around that issue and one thing that came
>>>> to my mind is to create a map that holds keys & values that can be edited
>>>> without changing the schema, though im thinking how to implement it in
>>>> Calcite.
>>>> Considering the following original SQL in which "metrics" can be
>>>> added/deleted/renamed
>>>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
>>>> Group by a
>>>>
>>>> im looking both at json_objectagg & map to change it but it seems that
>>>> json_objectagg is on a later calcite version and map doesnt work for me.
>>>> Trying something like
>>>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as
>>>> metric_map
>>>> group by a
>>>>
>>>> results with "Non-query expression encountered in illegal context"
>>>> is my train of thought the right one? if so, do i have a mistake in the
>>>> way im trying to implement it?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Restarting a changed query from a savepoint is currently not supported.
>>>>> In general this is a very difficult problem as new queries might
>>>>> result in completely different execution plans.
>>>>> The special case of adding and removing aggregates is easier to solve,
>>>>> but the schema of the stored state changes and we would need to analyze the
>>>>> previous and current query and generate compatible serializers.
>>>>> So far we did not explore this rabbit hole.
>>>>>
>>>>> Also, starting a different query from a savepoint can also lead to
>>>>> weird result semantics.
>>>>> I'd recommend to bootstrap the state of the new query from scatch.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>>
>>>>>
>>>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
>>>>> shahar.kobrinsky@gmail.com>:
>>>>>
>>>>>> Or is it the SQL state that is incompatible.. ?
>>>>>>
>>>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>>>>>> shahar.kobrinsky@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Guys,
>>>>>>>
>>>>>>> I actually got an error now adding some fields into the select
>>>>>>> statement:
>>>>>>>
>>>>>>> java.lang.RuntimeException: Error while getting state
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>>>>>> at
>>>>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>>>>>> backends, the new state serializer must not be incompatible.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>>>> ... 9 more
>>>>>>>
>>>>>>> Does that mean i should move from having a Pojo storing the result
>>>>>>> of the SQL retracted stream to Avro? trying to understand how to mitigate
>>>>>>> it.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Shahar,
>>>>>>>>
>>>>>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>>>>>> they save the accumulators to SQL internal states: which are invariant from
>>>>>>>> your input schema. Based on what you described I think that's why it is
>>>>>>>> fine for recovering from existing state.
>>>>>>>> I think one confusion you might have is the "toRetractStream"
>>>>>>>> syntax. This actually passes the "retracting" flag to the Flink planner to
>>>>>>>> indicate how the DataStream operator gets generated based on your SQL.
>>>>>>>>
>>>>>>>> So in my understanding, there's really no "state" associated with
>>>>>>>> the "retracting stream", but rather associated with the generated
>>>>>>>> operators.
>>>>>>>> However, I am not expert in Table/SQL state recovery: I recall
>>>>>>>> there were an open JIRA[1] that might be related to your question regarding
>>>>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>>>>>> insight here?
>>>>>>>>
>>>>>>>> Regarding the rest of the pipeline, both "filter" and "map"
>>>>>>>> operators are stateless; and sink state recovery depends on what you do.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Rong
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>>>>>
>>>>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Rong,
>>>>>>>>>
>>>>>>>>> I have made some quick test changing the SQL select (adding a
>>>>>>>>> select field
>>>>>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>>>>>> without any
>>>>>>>>> errors. I want to make sure i understand how at what point the
>>>>>>>>> state is
>>>>>>>>> stored and how does it work.
>>>>>>>>>
>>>>>>>>> Let's simplify the scenario and forget my specific case of
>>>>>>>>> dynamically
>>>>>>>>> generated pojo. let's focus on generic steps of:
>>>>>>>>> Source->register table->SQL select and group by session->retracted
>>>>>>>>> stream
>>>>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>>>>>
>>>>>>>>> And let's assume the SQL select is changed (a field is added
>>>>>>>>> somewhere in
>>>>>>>>> the middle of the select field).
>>>>>>>>> So:
>>>>>>>>> We had intermediate results that are in the old format that are
>>>>>>>>> loaded from
>>>>>>>>> state to the new Row object in the retracted stream. is that an
>>>>>>>>> accurate
>>>>>>>>> statement? at what operator/format is the state stored in this
>>>>>>>>> case? is it
>>>>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail
>>>>>>>>> for me im
>>>>>>>>> trying to understand how/where it is handled in Flink?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Sent from:
>>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>>>
>>>>>>>>

Re: Schema Evolution on Dynamic Schema

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Hmm kinda stuck here. Seems like SQL Group by is translated to a
*GroupAggProcessFunction* which stores a state for every aggregation
element (thus flattening the map items for state store). Seems like there's
no way around it. Am i wrong? is there any way to evolve the map elements
when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  *?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> I think this would work.
> However, you should be aware that all keys are kept forever in state
> (unless you configure idle state retention time [1]).
> This includes previous versions of keys.
>
> Also note that we are not guaranteeing savepoint compatibility across
> Flink versions yet.
> If the state of the aggregation operator changes in a later version (say
> Flink 1.9.x), it might not be possible to migrate to a later Flink version.
> Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com>:
>
>> My bad. it actually did work with
>> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
>> group by a
>>
>> do you think thats OK as a workaround? main schema should be changed that
>> way - only keys in the map
>>
>> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
>> shahar.kobrinsky@gmail.com> wrote:
>>
>>> Thanks Fabian,
>>>
>>> Im thinking about how to work around that issue and one thing that came
>>> to my mind is to create a map that holds keys & values that can be edited
>>> without changing the schema, though im thinking how to implement it in
>>> Calcite.
>>> Considering the following original SQL in which "metrics" can be
>>> added/deleted/renamed
>>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
>>> Group by a
>>>
>>> im looking both at json_objectagg & map to change it but it seems that
>>> json_objectagg is on a later calcite version and map doesnt work for me.
>>> Trying something like
>>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as
>>> metric_map
>>> group by a
>>>
>>> results with "Non-query expression encountered in illegal context"
>>> is my train of thought the right one? if so, do i have a mistake in the
>>> way im trying to implement it?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Restarting a changed query from a savepoint is currently not supported.
>>>> In general this is a very difficult problem as new queries might result
>>>> in completely different execution plans.
>>>> The special case of adding and removing aggregates is easier to solve,
>>>> but the schema of the stored state changes and we would need to analyze the
>>>> previous and current query and generate compatible serializers.
>>>> So far we did not explore this rabbit hole.
>>>>
>>>> Also, starting a different query from a savepoint can also lead to
>>>> weird result semantics.
>>>> I'd recommend to bootstrap the state of the new query from scatch.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
>>>> shahar.kobrinsky@gmail.com>:
>>>>
>>>>> Or is it the SQL state that is incompatible.. ?
>>>>>
>>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>>>>> shahar.kobrinsky@gmail.com> wrote:
>>>>>
>>>>>> Thanks Guys,
>>>>>>
>>>>>> I actually got an error now adding some fields into the select
>>>>>> statement:
>>>>>>
>>>>>> java.lang.RuntimeException: Error while getting state
>>>>>> at
>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>>>>> at
>>>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>>>>> at
>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>>>>> backends, the new state serializer must not be incompatible.
>>>>>> at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>>> ... 9 more
>>>>>>
>>>>>> Does that mean i should move from having a Pojo storing the result of
>>>>>> the SQL retracted stream to Avro? trying to understand how to mitigate it.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Shahar,
>>>>>>>
>>>>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>>>>> they save the accumulators to SQL internal states: which are invariant from
>>>>>>> your input schema. Based on what you described I think that's why it is
>>>>>>> fine for recovering from existing state.
>>>>>>> I think one confusion you might have is the "toRetractStream"
>>>>>>> syntax. This actually passes the "retracting" flag to the Flink planner to
>>>>>>> indicate how the DataStream operator gets generated based on your SQL.
>>>>>>>
>>>>>>> So in my understanding, there's really no "state" associated with
>>>>>>> the "retracting stream", but rather associated with the generated
>>>>>>> operators.
>>>>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>>>>> were an open JIRA[1] that might be related to your question regarding
>>>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>>>>> insight here?
>>>>>>>
>>>>>>> Regarding the rest of the pipeline, both "filter" and "map"
>>>>>>> operators are stateless; and sink state recovery depends on what you do.
>>>>>>>
>>>>>>> --
>>>>>>> Rong
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>>>>
>>>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Rong,
>>>>>>>>
>>>>>>>> I have made some quick test changing the SQL select (adding a
>>>>>>>> select field
>>>>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>>>>> without any
>>>>>>>> errors. I want to make sure i understand how at what point the
>>>>>>>> state is
>>>>>>>> stored and how does it work.
>>>>>>>>
>>>>>>>> Let's simplify the scenario and forget my specific case of
>>>>>>>> dynamically
>>>>>>>> generated pojo. let's focus on generic steps of:
>>>>>>>> Source->register table->SQL select and group by session->retracted
>>>>>>>> stream
>>>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>>>>
>>>>>>>> And let's assume the SQL select is changed (a field is added
>>>>>>>> somewhere in
>>>>>>>> the middle of the select field).
>>>>>>>> So:
>>>>>>>> We had intermediate results that are in the old format that are
>>>>>>>> loaded from
>>>>>>>> state to the new Row object in the retracted stream. is that an
>>>>>>>> accurate
>>>>>>>> statement? at what operator/format is the state stored in this
>>>>>>>> case? is it
>>>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail
>>>>>>>> for me im
>>>>>>>> trying to understand how/where it is handled in Flink?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Sent from:
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>>
>>>>>>>

Re: Schema Evolution on Dynamic Schema

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I think this would work.
However, you should be aware that all keys are kept forever in state
(unless you configure idle state retention time [1]).
This includes previous versions of keys.

Also note that we are not guaranteeing savepoint compatibility across Flink
versions yet.
If the state of the aggregation operator changes in a later version (say
Flink 1.9.x), it might not be possible to migrate to a later Flink version.
Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time

Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com>:

> My bad. it actually did work with
> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
> group by a
>
> do you think thats OK as a workaround? main schema should be changed that
> way - only keys in the map
>
> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>> Im thinking about how to work around that issue and one thing that came
>> to my mind is to create a map that holds keys & values that can be edited
>> without changing the schema, though im thinking how to implement it in
>> Calcite.
>> Considering the following original SQL in which "metrics" can be
>> added/deleted/renamed
>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
>> Group by a
>>
>> im looking both at json_objectagg & map to change it but it seems that
>> json_objectagg is on a later calcite version and map doesnt work for me.
>> Trying something like
>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as
>> metric_map
>> group by a
>>
>> results with "Non-query expression encountered in illegal context"
>> is my train of thought the right one? if so, do i have a mistake in the
>> way im trying to implement it?
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Restarting a changed query from a savepoint is currently not supported.
>>> In general this is a very difficult problem as new queries might result
>>> in completely different execution plans.
>>> The special case of adding and removing aggregates is easier to solve,
>>> but the schema of the stored state changes and we would need to analyze the
>>> previous and current query and generate compatible serializers.
>>> So far we did not explore this rabbit hole.
>>>
>>> Also, starting a different query from a savepoint can also lead to weird
>>> result semantics.
>>> I'd recommend to bootstrap the state of the new query from scatch.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
>>> shahar.kobrinsky@gmail.com>:
>>>
>>>> Or is it the SQL state that is incompatible.. ?
>>>>
>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>>>> shahar.kobrinsky@gmail.com> wrote:
>>>>
>>>>> Thanks Guys,
>>>>>
>>>>> I actually got an error now adding some fields into the select
>>>>> statement:
>>>>>
>>>>> java.lang.RuntimeException: Error while getting state
>>>>> at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>>>> at
>>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>>>> at
>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>>>> backends, the new state serializer must not be incompatible.
>>>>> at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>>>> at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>>>> at
>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>> at
>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>>>> at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>>>> at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>>>> at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>>>> at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>> ... 9 more
>>>>>
>>>>> Does that mean i should move from having a Pojo storing the result of
>>>>> the SQL retracted stream to Avro? trying to understand how to mitigate it.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:
>>>>>
>>>>>> Hi Shahar,
>>>>>>
>>>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>>>> they save the accumulators to SQL internal states: which are invariant from
>>>>>> your input schema. Based on what you described I think that's why it is
>>>>>> fine for recovering from existing state.
>>>>>> I think one confusion you might have is the "toRetractStream" syntax.
>>>>>> This actually passes the "retracting" flag to the Flink planner to indicate
>>>>>> how the DataStream operator gets generated based on your SQL.
>>>>>>
>>>>>> So in my understanding, there's really no "state" associated with the
>>>>>> "retracting stream", but rather associated with the generated operators.
>>>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>>>> were an open JIRA[1] that might be related to your question regarding
>>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>>>> insight here?
>>>>>>
>>>>>> Regarding the rest of the pipeline, both "filter" and "map" operators
>>>>>> are stateless; and sink state recovery depends on what you do.
>>>>>>
>>>>>> --
>>>>>> Rong
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>>>
>>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Rong,
>>>>>>>
>>>>>>> I have made some quick test changing the SQL select (adding a select
>>>>>>> field
>>>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>>>> without any
>>>>>>> errors. I want to make sure i understand how at what point the state
>>>>>>> is
>>>>>>> stored and how does it work.
>>>>>>>
>>>>>>> Let's simplify the scenario and forget my specific case of
>>>>>>> dynamically
>>>>>>> generated pojo. let's focus on generic steps of:
>>>>>>> Source->register table->SQL select and group by session->retracted
>>>>>>> stream
>>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>>>
>>>>>>> And let's assume the SQL select is changed (a field is added
>>>>>>> somewhere in
>>>>>>> the middle of the select field).
>>>>>>> So:
>>>>>>> We had intermediate results that are in the old format that are
>>>>>>> loaded from
>>>>>>> state to the new Row object in the retracted stream. is that an
>>>>>>> accurate
>>>>>>> statement? at what operator/format is the state stored in this case?
>>>>>>> is it
>>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail
>>>>>>> for me im
>>>>>>> trying to understand how/where it is handled in Flink?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>
>>>>>>

Re: Schema Evolution on Dynamic Schema

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
My bad. it actually did work with
Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
group by a

do you think thats OK as a workaround? main schema should be changed that
way - only keys in the map

On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Thanks Fabian,
>
> Im thinking about how to work around that issue and one thing that came to
> my mind is to create a map that holds keys & values that can be edited
> without changing the schema, though im thinking how to implement it in
> Calcite.
> Considering the following original SQL in which "metrics" can be
> added/deleted/renamed
> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
> Group by a
>
> im looking both at json_objectagg & map to change it but it seems that
> json_objectagg is on a later calcite version and map doesnt work for me.
> Trying something like
> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
> group by a
>
> results with "Non-query expression encountered in illegal context"
> is my train of thought the right one? if so, do i have a mistake in the
> way im trying to implement it?
>
> Thanks!
>
>
>
>
>
>
>
> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> Restarting a changed query from a savepoint is currently not supported.
>> In general this is a very difficult problem as new queries might result
>> in completely different execution plans.
>> The special case of adding and removing aggregates is easier to solve,
>> but the schema of the stored state changes and we would need to analyze the
>> previous and current query and generate compatible serializers.
>> So far we did not explore this rabbit hole.
>>
>> Also, starting a different query from a savepoint can also lead to weird
>> result semantics.
>> I'd recommend to bootstrap the state of the new query from scatch.
>>
>> Best, Fabian
>>
>>
>>
>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
>> shahar.kobrinsky@gmail.com>:
>>
>>> Or is it the SQL state that is incompatible.. ?
>>>
>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>>> shahar.kobrinsky@gmail.com> wrote:
>>>
>>>> Thanks Guys,
>>>>
>>>> I actually got an error now adding some fields into the select
>>>> statement:
>>>>
>>>> java.lang.RuntimeException: Error while getting state
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>>> at
>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>>> at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>> at
>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>>> backends, the new state serializer must not be incompatible.
>>>> at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>>> at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>>> at
>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>> at
>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>>> at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>>> at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>> ... 9 more
>>>>
>>>> Does that mean i should move from having a Pojo storing the result of
>>>> the SQL retracted stream to Avro? trying to understand how to mitigate it.
>>>>
>>>> Thanks
>>>>
>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:
>>>>
>>>>> Hi Shahar,
>>>>>
>>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>>> they save the accumulators to SQL internal states: which are invariant from
>>>>> your input schema. Based on what you described I think that's why it is
>>>>> fine for recovering from existing state.
>>>>> I think one confusion you might have is the "toRetractStream" syntax.
>>>>> This actually passes the "retracting" flag to the Flink planner to indicate
>>>>> how the DataStream operator gets generated based on your SQL.
>>>>>
>>>>> So in my understanding, there's really no "state" associated with the
>>>>> "retracting stream", but rather associated with the generated operators.
>>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>>> were an open JIRA[1] that might be related to your question regarding
>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>>> insight here?
>>>>>
>>>>> Regarding the rest of the pipeline, both "filter" and "map" operators
>>>>> are stateless; and sink state recovery depends on what you do.
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>>
>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Rong,
>>>>>>
>>>>>> I have made some quick test changing the SQL select (adding a select
>>>>>> field
>>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>>> without any
>>>>>> errors. I want to make sure i understand how at what point the state
>>>>>> is
>>>>>> stored and how does it work.
>>>>>>
>>>>>> Let's simplify the scenario and forget my specific case of dynamically
>>>>>> generated pojo. let's focus on generic steps of:
>>>>>> Source->register table->SQL select and group by session->retracted
>>>>>> stream
>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>>
>>>>>> And let's assume the SQL select is changed (a field is added
>>>>>> somewhere in
>>>>>> the middle of the select field).
>>>>>> So:
>>>>>> We had intermediate results that are in the old format that are
>>>>>> loaded from
>>>>>> state to the new Row object in the retracted stream. is that an
>>>>>> accurate
>>>>>> statement? at what operator/format is the state stored in this case?
>>>>>> is it
>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail
>>>>>> for me im
>>>>>> trying to understand how/where it is handled in Flink?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>

Re: Schema Evolution on Dynamic Schema

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Thanks Fabian,

Im thinking about how to work around that issue and one thing that came to
my mind is to create a map that holds keys & values that can be edited
without changing the schema, though im thinking how to implement it in
Calcite.
Considering the following original SQL in which "metrics" can be
added/deleted/renamed
Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
Group by a

im looking both at json_objectagg & map to change it but it seems that
json_objectagg is on a later calcite version and map doesnt work for me.
Trying something like
Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map
group by a

results with "Non-query expression encountered in illegal context"
is my train of thought the right one? if so, do i have a mistake in the way
im trying to implement it?

Thanks!







On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Restarting a changed query from a savepoint is currently not supported.
> In general this is a very difficult problem as new queries might result in
> completely different execution plans.
> The special case of adding and removing aggregates is easier to solve, but
> the schema of the stored state changes and we would need to analyze the
> previous and current query and generate compatible serializers.
> So far we did not explore this rabbit hole.
>
> Also, starting a different query from a savepoint can also lead to weird
> result semantics.
> I'd recommend to bootstrap the state of the new query from scatch.
>
> Best, Fabian
>
>
>
> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com>:
>
>> Or is it the SQL state that is incompatible.. ?
>>
>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>> shahar.kobrinsky@gmail.com> wrote:
>>
>>> Thanks Guys,
>>>
>>> I actually got an error now adding some fields into the select statement:
>>>
>>> java.lang.RuntimeException: Error while getting state
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>> at
>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>> at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>> at
>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>> backends, the new state serializer must not be incompatible.
>>> at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>> at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>> at
>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>> at
>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>> at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>> at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>> at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>> ... 9 more
>>>
>>> Does that mean i should move from having a Pojo storing the result of
>>> the SQL retracted stream to Avro? trying to understand how to mitigate it.
>>>
>>> Thanks
>>>
>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:
>>>
>>>> Hi Shahar,
>>>>
>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>> they save the accumulators to SQL internal states: which are invariant from
>>>> your input schema. Based on what you described I think that's why it is
>>>> fine for recovering from existing state.
>>>> I think one confusion you might have is the "toRetractStream" syntax.
>>>> This actually passes the "retracting" flag to the Flink planner to indicate
>>>> how the DataStream operator gets generated based on your SQL.
>>>>
>>>> So in my understanding, there's really no "state" associated with the
>>>> "retracting stream", but rather associated with the generated operators.
>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>> were an open JIRA[1] that might be related to your question regarding
>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>> insight here?
>>>>
>>>> Regarding the rest of the pipeline, both "filter" and "map" operators
>>>> are stateless; and sink state recovery depends on what you do.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>
>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Rong,
>>>>>
>>>>> I have made some quick test changing the SQL select (adding a select
>>>>> field
>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>> without any
>>>>> errors. I want to make sure i understand how at what point the state is
>>>>> stored and how does it work.
>>>>>
>>>>> Let's simplify the scenario and forget my specific case of dynamically
>>>>> generated pojo. let's focus on generic steps of:
>>>>> Source->register table->SQL select and group by session->retracted
>>>>> stream
>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>
>>>>> And let's assume the SQL select is changed (a field is added somewhere
>>>>> in
>>>>> the middle of the select field).
>>>>> So:
>>>>> We had intermediate results that are in the old format that are loaded
>>>>> from
>>>>> state to the new Row object in the retracted stream. is that an
>>>>> accurate
>>>>> statement? at what operator/format is the state stored in this case?
>>>>> is it
>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail for
>>>>> me im
>>>>> trying to understand how/where it is handled in Flink?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Re: Schema Evolution on Dynamic Schema

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Restarting a changed query from a savepoint is currently not supported.
In general this is a very difficult problem as new queries might result in
completely different execution plans.
The special case of adding and removing aggregates is easier to solve, but
the schema of the stored state changes and we would need to analyze the
previous and current query and generate compatible serializers.
So far we did not explore this rabbit hole.

Also, starting a different query from a savepoint can also lead to weird
result semantics.
I'd recommend to bootstrap the state of the new query from scatch.

Best, Fabian



Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com>:

> Or is it the SQL state that is incompatible.. ?
>
> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
>> Thanks Guys,
>>
>> I actually got an error now adding some fields into the select statement:
>>
>> java.lang.RuntimeException: Error while getting state
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>> at
>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>> backends, the new state serializer must not be incompatible.
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>> at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>> at
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>> at
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>> at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>> at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>> ... 9 more
>>
>> Does that mean i should move from having a Pojo storing the result of the
>> SQL retracted stream to Avro? trying to understand how to mitigate it.
>>
>> Thanks
>>
>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:
>>
>>> Hi Shahar,
>>>
>>> From my understanding, if you use "groupby" withAggregateFunctions, they
>>> save the accumulators to SQL internal states: which are invariant from your
>>> input schema. Based on what you described I think that's why it is fine for
>>> recovering from existing state.
>>> I think one confusion you might have is the "toRetractStream" syntax.
>>> This actually passes the "retracting" flag to the Flink planner to indicate
>>> how the DataStream operator gets generated based on your SQL.
>>>
>>> So in my understanding, there's really no "state" associated with the
>>> "retracting stream", but rather associated with the generated operators.
>>> However, I am not expert in Table/SQL state recovery: I recall there
>>> were an open JIRA[1] that might be related to your question regarding
>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>> insight here?
>>>
>>> Regarding the rest of the pipeline, both "filter" and "map" operators
>>> are stateless; and sink state recovery depends on what you do.
>>>
>>> --
>>> Rong
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>
>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Rong,
>>>>
>>>> I have made some quick test changing the SQL select (adding a select
>>>> field
>>>> in the middle) and reran the job from a savepoint and it worked without
>>>> any
>>>> errors. I want to make sure i understand how at what point the state is
>>>> stored and how does it work.
>>>>
>>>> Let's simplify the scenario and forget my specific case of dynamically
>>>> generated pojo. let's focus on generic steps of:
>>>> Source->register table->SQL select and group by session->retracted
>>>> stream
>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>
>>>> And let's assume the SQL select is changed (a field is added somewhere
>>>> in
>>>> the middle of the select field).
>>>> So:
>>>> We had intermediate results that are in the old format that are loaded
>>>> from
>>>> state to the new Row object in the retracted stream. is that an accurate
>>>> statement? at what operator/format is the state stored in this case? is
>>>> it
>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail for
>>>> me im
>>>> trying to understand how/where it is handled in Flink?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Re: Schema Evolution on Dynamic Schema

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Or is it the SQL state that is incompatible.. ?

On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Thanks Guys,
>
> I actually got an error now adding some fields into the select statement:
>
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
> at
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: For heap
> backends, the new state serializer must not be incompatible.
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 9 more
>
> Does that mean i should move from having a Pojo storing the result of the
> SQL retracted stream to Avro? trying to understand how to mitigate it.
>
> Thanks
>
> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Shahar,
>>
>> From my understanding, if you use "groupby" withAggregateFunctions, they
>> save the accumulators to SQL internal states: which are invariant from your
>> input schema. Based on what you described I think that's why it is fine for
>> recovering from existing state.
>> I think one confusion you might have is the "toRetractStream" syntax.
>> This actually passes the "retracting" flag to the Flink planner to indicate
>> how the DataStream operator gets generated based on your SQL.
>>
>> So in my understanding, there's really no "state" associated with the
>> "retracting stream", but rather associated with the generated operators.
>> However, I am not expert in Table/SQL state recovery: I recall there were
>> an open JIRA[1] that might be related to your question regarding SQL/Table
>> generated operator recovery. Maybe @Fabian can provide more insight here?
>>
>> Regarding the rest of the pipeline, both "filter" and "map" operators are
>> stateless; and sink state recovery depends on what you do.
>>
>> --
>> Rong
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>
>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com>
>> wrote:
>>
>>> Thanks Rong,
>>>
>>> I have made some quick test changing the SQL select (adding a select
>>> field
>>> in the middle) and reran the job from a savepoint and it worked without
>>> any
>>> errors. I want to make sure i understand how at what point the state is
>>> stored and how does it work.
>>>
>>> Let's simplify the scenario and forget my specific case of dynamically
>>> generated pojo. let's focus on generic steps of:
>>> Source->register table->SQL select and group by session->retracted stream
>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>
>>> And let's assume the SQL select is changed (a field is added somewhere in
>>> the middle of the select field).
>>> So:
>>> We had intermediate results that are in the old format that are loaded
>>> from
>>> state to the new Row object in the retracted stream. is that an accurate
>>> statement? at what operator/format is the state stored in this case? is
>>> it
>>> the SQL result/Row? is it the Pojo? as this scenario does not fail for
>>> me im
>>> trying to understand how/where it is handled in Flink?
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: Schema Evolution on Dynamic Schema

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Thanks Guys,

I actually got an error now adding some fields into the select statement:

java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
at
org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: For heap
backends, the new state serializer must not be incompatible.
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 9 more

Does that mean i should move from having a Pojo storing the result of the
SQL retracted stream to Avro? trying to understand how to mitigate it.

Thanks

On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <wa...@gmail.com> wrote:

> Hi Shahar,
>
> From my understanding, if you use "groupby" withAggregateFunctions, they
> save the accumulators to SQL internal states: which are invariant from your
> input schema. Based on what you described I think that's why it is fine for
> recovering from existing state.
> I think one confusion you might have is the "toRetractStream" syntax. This
> actually passes the "retracting" flag to the Flink planner to indicate how
> the DataStream operator gets generated based on your SQL.
>
> So in my understanding, there's really no "state" associated with the
> "retracting stream", but rather associated with the generated operators.
> However, I am not expert in Table/SQL state recovery: I recall there were
> an open JIRA[1] that might be related to your question regarding SQL/Table
> generated operator recovery. Maybe @Fabian can provide more insight here?
>
> Regarding the rest of the pipeline, both "filter" and "map" operators are
> stateless; and sink state recovery depends on what you do.
>
> --
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-6966
>
> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com> wrote:
>
>> Thanks Rong,
>>
>> I have made some quick test changing the SQL select (adding a select field
>> in the middle) and reran the job from a savepoint and it worked without
>> any
>> errors. I want to make sure i understand how at what point the state is
>> stored and how does it work.
>>
>> Let's simplify the scenario and forget my specific case of dynamically
>> generated pojo. let's focus on generic steps of:
>> Source->register table->SQL select and group by session->retracted stream
>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>
>> And let's assume the SQL select is changed (a field is added somewhere in
>> the middle of the select field).
>> So:
>> We had intermediate results that are in the old format that are loaded
>> from
>> state to the new Row object in the retracted stream. is that an accurate
>> statement? at what operator/format is the state stored in this case? is it
>> the SQL result/Row? is it the Pojo? as this scenario does not fail for me
>> im
>> trying to understand how/where it is handled in Flink?
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Schema Evolution on Dynamic Schema

Posted by Rong Rong <wa...@gmail.com>.
Hi Shahar,

From my understanding, if you use "groupby" withAggregateFunctions, they
save the accumulators to SQL internal states: which are invariant from your
input schema. Based on what you described I think that's why it is fine for
recovering from existing state.
I think one confusion you might have is the "toRetractStream" syntax. This
actually passes the "retracting" flag to the Flink planner to indicate how
the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the
"retracting stream", but rather associated with the generated operators.
However, I am not expert in Table/SQL state recovery: I recall there were
an open JIRA[1] that might be related to your question regarding SQL/Table
generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are
stateless; and sink state recovery depends on what you do.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-6966

On Fri, Mar 8, 2019 at 12:07 PM shkob1 <sh...@gmail.com> wrote:

> Thanks Rong,
>
> I have made some quick test changing the SQL select (adding a select field
> in the middle) and reran the job from a savepoint and it worked without any
> errors. I want to make sure i understand how at what point the state is
> stored and how does it work.
>
> Let's simplify the scenario and forget my specific case of dynamically
> generated pojo. let's focus on generic steps of:
> Source->register table->SQL select and group by session->retracted stream
> (Row)->transformToPojo (Custom Map function) ->pushToSink
>
> And let's assume the SQL select is changed (a field is added somewhere in
> the middle of the select field).
> So:
> We had intermediate results that are in the old format that are loaded from
> state to the new Row object in the retracted stream. is that an accurate
> statement? at what operator/format is the state stored in this case? is it
> the SQL result/Row? is it the Pojo? as this scenario does not fail for me
> im
> trying to understand how/where it is handled in Flink?
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Schema Evolution on Dynamic Schema

Posted by shkob1 <sh...@gmail.com>.
Thanks Rong,

I have made some quick test changing the SQL select (adding a select field
in the middle) and reran the job from a savepoint and it worked without any
errors. I want to make sure i understand how at what point the state is
stored and how does it work.

Let's simplify the scenario and forget my specific case of dynamically
generated pojo. let's focus on generic steps of: 
Source->register table->SQL select and group by session->retracted stream
(Row)->transformToPojo (Custom Map function) ->pushToSink

And let's assume the SQL select is changed (a field is added somewhere in
the middle of the select field).
So:
We had intermediate results that are in the old format that are loaded from
state to the new Row object in the retracted stream. is that an accurate
statement? at what operator/format is the state stored in this case? is it
the SQL result/Row? is it the Pojo? as this scenario does not fail for me im
trying to understand how/where it is handled in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Schema Evolution on Dynamic Schema

Posted by Rong Rong <wa...@gmail.com>.
Hi Shahar,

1. Are you referring to that the incoming data source is published as JSON
and you have a customized Pojo source function / table source that converts
it? In that case it is you that maintains the schema evolution support am I
correct? For Avro I think you can refer to [1].
2. If you change the SQL, you will have to recompile and rerun your job.
This means the new compilation of the SQL will yield correct logic to run
against your new schema. I don't foresee this to be an issue. For the
second problem: yes it is your customized serialization sink function's
responsibility to convert Row into the output class objects. I am not sure
if this is the piece of code that you are looking for [2] if you are using
Avro, but you might be able to leverage that?

If you are sticking with your own format of generated/dynamic class, you
might have to create that in your custom source/sink table.

Thanks,
Rong

[1]
https://github.com/apache/flink/tree/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro
[2]
https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java#L170

On Thu, Mar 7, 2019 at 11:20 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Thanks for the response Rong. Would be happy to clarify more.
> So there are two possible changes that could happen:
>
>    1. There could be a change in the incoming source schema. Since
>    there's a deserialization phase here (JSON -> Pojo) i expect a couple of
>    options. Backward compatible changes to the JSON should not have an impact
>    (as the Pojo is the same), however we might want to change the Pojo which i
>    believe is a state evolving action. I do want to migrate the Pojo to Avro -
>    will that suffice for Schema evolution feature to work?
>    2. The other possible change is the SQL select fields change, as
>    mention someone could add/delete/change-order another field to the SQL
>    Select. I do see this as an issue per the way i transform the Row object to
>    the dynamically generated class. This is done today through indices of the
>    class fields and the ones of the Row object. This seems like an issue for
>    when for example a select field is added in the middle and now there's an
>    older Row which fields order is not matching the (new) generated Class
>    fields order. I'm thinking of how to solve that one - i imagine this is not
>    something the schema evolution feature can solve (am i right?). im thinking
>    on whether there is a way i can transform the Row object to my generated
>    class by maybe the Row's column names corresponding to the generated class
>    field names, though i don't see Row object has any notion of column names.
>
> Would love to hear your thoughts. If you want me to paste some code here i
> can do so.
>
> Shahar
>
> On Thu, Mar 7, 2019 at 10:40 AM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Shahar,
>>
>> I wasn't sure which schema are you describing that is going to "evolve"
>> (is it the registered_table? or the output sink?). It will be great if you
>> can clarify more.
>>
>> For the example you provided, IMO it is more considered as logic change
>> instead of schema evolution:
>> - if you are changing max(c) to max(d) in your query. I don't think this
>> qualifies as schema evolution.
>> - if you are adding another column "max(d)" to your query along with your
>> existing "max(c)" that might be considered as a backward compatible change.
>> However, either case you will have to restart your logic, you can also
>> consult how state schema evolution [1], and there are many other problems
>> that can be tricky as well[2,3].
>>
>> Thanks,
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
>> [2]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
>> [3]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293
>>
>>
>> On Wed, Mar 6, 2019 at 12:52 PM shkob1 <sh...@gmail.com>
>> wrote:
>>
>>> Hey,
>>>
>>> My job is built on SQL that is injected as an input to the job. so lets
>>> take
>>> an example of
>>>
>>> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>>>
>>> (side note: in order for the state not to grow indefinitely i'm
>>> transforming
>>> to a retracted stream and filtering based on a custom trigger)
>>>
>>> In order to get the output as a Json format i basically created a way to
>>> dynamically generate a class and registering it to the class loader, so
>>> when
>>> transforming to the retracted stream im doing something like:
>>>
>>> Table result = tableEnv.sqlQuery(sqlExpression);
>>> tableEnv.toRetractStream(result, Row.class, config)
>>> .filter(tuple -> tuple.f0)
>>> .map(new RowToDynamicClassMapper(sqlSelectFields))
>>> .addSink(..)
>>>
>>> This actually works pretty good (though i do need to make sure to
>>> register
>>> the dynamic class to the class loader whenever the state is loaded)
>>>
>>> Im now looking into "schema evolution" - which basically means what
>>> happens
>>> when the query is changed (say max(c) is removed, and maybe max(d) is
>>> added). I dont know if that fits the classic "schema evolution" feature
>>> or
>>> should that be thought about differently. Would be happy to get some
>>> thoughts.
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: Schema Evolution on Dynamic Schema

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Thanks for the response Rong. Would be happy to clarify more.
So there are two possible changes that could happen:

   1. There could be a change in the incoming source schema. Since there's
   a deserialization phase here (JSON -> Pojo) i expect a couple of options.
   Backward compatible changes to the JSON should not have an impact (as the
   Pojo is the same), however we might want to change the Pojo which i believe
   is a state evolving action. I do want to migrate the Pojo to Avro - will
   that suffice for Schema evolution feature to work?
   2. The other possible change is the SQL select fields change, as mention
   someone could add/delete/change-order another field to the SQL Select. I do
   see this as an issue per the way i transform the Row object to the
   dynamically generated class. This is done today through indices of the
   class fields and the ones of the Row object. This seems like an issue for
   when for example a select field is added in the middle and now there's an
   older Row which fields order is not matching the (new) generated Class
   fields order. I'm thinking of how to solve that one - i imagine this is not
   something the schema evolution feature can solve (am i right?). im thinking
   on whether there is a way i can transform the Row object to my generated
   class by maybe the Row's column names corresponding to the generated class
   field names, though i don't see Row object has any notion of column names.

Would love to hear your thoughts. If you want me to paste some code here i
can do so.

Shahar

On Thu, Mar 7, 2019 at 10:40 AM Rong Rong <wa...@gmail.com> wrote:

> Hi Shahar,
>
> I wasn't sure which schema are you describing that is going to "evolve"
> (is it the registered_table? or the output sink?). It will be great if you
> can clarify more.
>
> For the example you provided, IMO it is more considered as logic change
> instead of schema evolution:
> - if you are changing max(c) to max(d) in your query. I don't think this
> qualifies as schema evolution.
> - if you are adding another column "max(d)" to your query along with your
> existing "max(c)" that might be considered as a backward compatible change.
> However, either case you will have to restart your logic, you can also
> consult how state schema evolution [1], and there are many other problems
> that can be tricky as well[2,3].
>
> Thanks,
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
> [2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
> [3]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293
>
>
> On Wed, Mar 6, 2019 at 12:52 PM shkob1 <sh...@gmail.com> wrote:
>
>> Hey,
>>
>> My job is built on SQL that is injected as an input to the job. so lets
>> take
>> an example of
>>
>> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>>
>> (side note: in order for the state not to grow indefinitely i'm
>> transforming
>> to a retracted stream and filtering based on a custom trigger)
>>
>> In order to get the output as a Json format i basically created a way to
>> dynamically generate a class and registering it to the class loader, so
>> when
>> transforming to the retracted stream im doing something like:
>>
>> Table result = tableEnv.sqlQuery(sqlExpression);
>> tableEnv.toRetractStream(result, Row.class, config)
>> .filter(tuple -> tuple.f0)
>> .map(new RowToDynamicClassMapper(sqlSelectFields))
>> .addSink(..)
>>
>> This actually works pretty good (though i do need to make sure to register
>> the dynamic class to the class loader whenever the state is loaded)
>>
>> Im now looking into "schema evolution" - which basically means what
>> happens
>> when the query is changed (say max(c) is removed, and maybe max(d) is
>> added). I dont know if that fits the classic "schema evolution" feature or
>> should that be thought about differently. Would be happy to get some
>> thoughts.
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Schema Evolution on Dynamic Schema

Posted by Rong Rong <wa...@gmail.com>.
Hi Shahar,

I wasn't sure which schema are you describing that is going to "evolve" (is
it the registered_table? or the output sink?). It will be great if you can
clarify more.

For the example you provided, IMO it is more considered as logic change
instead of schema evolution:
- if you are changing max(c) to max(d) in your query. I don't think this
qualifies as schema evolution.
- if you are adding another column "max(d)" to your query along with your
existing "max(c)" that might be considered as a backward compatible change.
However, either case you will have to restart your logic, you can also
consult how state schema evolution [1], and there are many other problems
that can be tricky as well[2,3].

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293


On Wed, Mar 6, 2019 at 12:52 PM shkob1 <sh...@gmail.com> wrote:

> Hey,
>
> My job is built on SQL that is injected as an input to the job. so lets
> take
> an example of
>
> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>
> (side note: in order for the state not to grow indefinitely i'm
> transforming
> to a retracted stream and filtering based on a custom trigger)
>
> In order to get the output as a Json format i basically created a way to
> dynamically generate a class and registering it to the class loader, so
> when
> transforming to the retracted stream im doing something like:
>
> Table result = tableEnv.sqlQuery(sqlExpression);
> tableEnv.toRetractStream(result, Row.class, config)
> .filter(tuple -> tuple.f0)
> .map(new RowToDynamicClassMapper(sqlSelectFields))
> .addSink(..)
>
> This actually works pretty good (though i do need to make sure to register
> the dynamic class to the class loader whenever the state is loaded)
>
> Im now looking into "schema evolution" - which basically means what happens
> when the query is changed (say max(c) is removed, and maybe max(d) is
> added). I dont know if that fits the classic "schema evolution" feature or
> should that be thought about differently. Would be happy to get some
> thoughts.
>
> Thanks!
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>