You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Seth Wiesman <sj...@gmail.com> on 2019/06/04 11:14:40 UTC

Re: [Discuss] FLIP-43: Savepoint Connector

It seems like a recurring piece of feedback was a different name. I’d like to propose moving the functionality to the libraries module and naming this the State Processing API. 

Seth

> On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com> wrote:
> 
> The SavepointOutputFormat only writes out the savepoint metadata file and should be mostly ignored.
> 
> The actual state is written out by stream operators and tied into the flink runtime[1, 2, 3].
> 
> This is the most important part and the piece that I don’t think can be reasonably extracted.
> 
> Seth
> 
> [1] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> 
> [2] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> 
> [3] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> 
>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
>> 
>> Hi Seth,
>> 
>> yes, that helped! :-)
>> 
>> What I was looking for is essentially `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It would be great if this could be written in a way, that would enable reusing it in different engine (as I mentioned - Apache Spark). There seem to be some issues though. It uses interface Savepoint, which uses several other objects and interfaces from Flink's runtime. Maybe some convenience API might help - Apache Beam, handles operator naming, so that definitely should be transitionable between systems, but I'm not sure, how to construct OperatorID from this name. Would you think, that it is possible to come up with something that could be used in this portable way?
>> 
>> I understand, there are some more conditions, that need to be satisfied (grouping, aggregating, ...), which would of course have to be handled by the target system. But Apache Beam can help leverage that. My idea would be, that there can be runner specified PTransform, that takes PCollection of some tuples of `(operator name, key, state name, value1), (operator name, key, state name, value2)`, and Runner's responsibility would be to group/aggregate this so that it can be written by runner's provided writer (output format).
>> 
>> All of this would need a lot more design, these are just ideas of "what could be possible", I was just wondering if this FLIP can make some first steps towards this.
>> 
>> Many thanks for comments,
>> 
>> Jan
>> 
>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
>>> @Jan Gotcha,
>>> 
>>> So in reusing components it explicitly is not a writer. This is not a savepoint output format in the way we have a parquet output format. The reason for the Transform api is to hide the underlying details, it does not simply append a output writer to the end of a dataset. This gets into the implementation details but at a high level, the dataset is:
>>> 
>>> 1) partitioned using key groups
>>> 2) data is run through a standard stream operator that takes a snapshot of its state after processing all records and outputs metadata handles for each subtask
>>> 3) those metadata handles are aggregated down to a single savepoint handle
>>> 4) that handle is written out as a final metadata file
>>> 
>>> What’s important here is that the api actually depends on the data flow collection and state is written out as a side effect of taking a savepoint. The FLIP describes a lose coupling to the dataset api for eventual migration to BoundedStream, that is true. However, the api does require knowing what concrete data flow is being used to perform these re-partitionings  and post aggregations.
>>> 
>>> I’m linking to my prototype implementation, particularly what actually happens when you call write and run the transformations. Let me know if that helps clarify.
>>> 
>>> Seth
>>> 
>>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
>>> 
>>> 
>>> 
>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>> 
>>>> Hi Seth,
>>>> 
>>>> that sounds reasonable. What I was asking for was not to reverse engineer binary format, but to make the savepoint write API a little more reusable, so that it could be wrapped into some other technology. I don't know the details enough to propose a solution, but it seems to me, that it could be possible to use something like Writer instead of Transform. Or maybe the Transform can use the Writer internally, the goal is just to enable to create the savepoint from "'outside" of Flink (with some library, of course).
>>>> 
>>>> Jan
>>>> 
>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
>>>>> @Konstantin agreed, that was a large impotence for this feature. Also I am happy to change the name to something that better  describes the feature set.
>>>>> 
>>>>> @Lan
>>>>> 
>>>>> Savepoints depend heavily on a number of flink internal components that may change between versions: state backends internals, type serializers, the specific hash function used to turn a UID into an OperatorID, etc. I consider it a feature of this proposal that the library depends on those internal components instead of reverse engineering the binary format. This way as those internals change, or new state features are added (think the recent addition of TTL) we will get support automatically. I do not believe anything else is maintainable.
>>>>> 
>>>>> Seth
>>>>> 
>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible?
>>>>>> 
>>>>>> Cheers,
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
>>>>>>> Hey Everyone!
>>>>>>> ​
>>>>>>> Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints.
>>>>>>> ​
>>>>>>> This is useful for:
>>>>>>> ​
>>>>>>>    Analyzing state for interesting patterns
>>>>>>>    Troubleshooting or auditing jobs by checking for discrepancies in state
>>>>>>>    Bootstrapping state for new applications
>>>>>>>    Modifying savepoints such as:
>>>>>>>        Changing max parallelism
>>>>>>>        Making breaking schema changes
>>>>>>>        Correcting invalid state
>>>>>>> ​
>>>>>>> We are looking forward to your feedback!
>>>>>>> ​
>>>>>>> This is the FLIP:
>>>>>>> ​
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
>>>>>>> 
>>>>>>> Seth
>>>>>>> 
>>>>>>> 
>>>>>>> 

Re: [Discuss] FLIP-43: Savepoint Connector

Posted by Jark Wu <im...@gmail.com>.
Hi Gordon,

The modify max parallelism API looks good to me. Thank you and Seth for the
great work on it.

Cheers,
Jark

On Tue, 25 Jun 2019 at 16:01, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Jark,
>
> Thanks for the reminder. I've updated the FLIP name in confluence to match
> the new name "State Processor API".
>
> Concerning an API for changing max parallelism:
> That is actually in the works and has been considered, and would look
> something like -
> ```
> ExistingSavepoint savepoint = Savepoint.load(oldPath);
> savepoint.modifyMaxParallelism(newParallelism, newPath);
> ```
>
> For the contribution, we've currently reached the write-part of the State
> processing API [1].
> After that is settled, we can then start thinking about adding this max
> parallelism change feature as a follow-up.
>
> Cheers,
> Gordon
>
> [1] https://github.com/apache/flink/pull/8861
>
> On Tue, Jun 25, 2019 at 2:27 PM Jark Wu <im...@gmail.com> wrote:
>
> > Thanks for the awesome FLIP.
> >
> > I think it will be very useful in state migration scenario. We are also
> > looking for a state reuse solution for SQL jobs. And I think this feature
> > will help a lot.
> > Looking forward to have it in the near future.
> >
> > Regarding to the naming, I'm +1 to "State Processing API". Should we also
> > update the FLIP name in confluence?
> >
> > Btw, what do you think to add a shortcut API for changing max parallelism
> > for savepoints? This is a very common scenario.
> > But from my understanding, it needs to do a lot of trivial thing to
> achieve
> > it under current API.
> >
> > Best,
> > Jark
> >
> >
> > On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tz...@apache.org>
> > wrote:
> >
> > > On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xi...@gmail.com>
> wrote:
> > >
> > > >  Hi Gordon & Seth, this looks like a very useful feature for analyze
> > and
> > > > manage states.
> > > > I agree that using DataSet is probably the most practical choice
> right
> > > > now. But in the longer adding the TableAPI support for this will be
> > nice.
> > > >
> > >
> > > Agreed. Migrating this API in the future to the TableAPI is definitely
> > > something we have considered.
> > >
> > >
> > > > When analyzing the savepoint, I assume that the state backend
> restores
> > > the
> > > > state first? This approach is generic and works for any state
> backend.
> > >
> > >
> > > Yes, that is correct. The process of reading state in snapshots is
> > > currently:
> > > 1) the metadata file is read when creating the input splits for the
> > > InputFormat. Each input split is assigned operator state and key-group
> > > state handles.
> > > 2) For each input split, a state backend is launched and is restored
> with
> > > all state of the assigned state handles. Only partially some state is
> > > transformed into DataSets (using the savepoint.read*State(...)
> methods).
> > >
> > >
> > > > However, sometimes it may be more efficient to directly analyzing the
> > > > files on DFS without copying. We can probably add interface to allow
> > > state
> > > > backend optimize such behavior in the future.
> > >
> > >
> > > That sounds like an interesting direction, though at the moment it may
> > only
> > > make sense for full savepoints / checkpoints.
> > > One blocker for enabling this, is having the type information of state
> > > available in the snapshot metadata file so that schema / type of state
> is
> > > known before actually reading state.
> > > Making state schema / type information available in the metadata file
> is
> > > already a recurring discussion in this thread that would be useful for
> > not
> > > only this feature you mentioned, but also for features like SQL
> > integration
> > > in the future.
> > > Therefore, this seems to be a reasonable next step when extending on
> top
> > of
> > > the initial scope of the API proposed in the FLIP.
> > >
> > >
> > > > Also a quick question on the example in wiki: DataSet<MyPojo>
> > keyedState
> > > =
> > > > operator.readKeyedState("uid", new ReaderFunction());Should
> > > > operator.readKeyedState  be replaced with savepoint.readKeyedState
> > here?
> > > >
> > >
> > > Correct, this is indeed a typo. I've corrected this in the FLIP.
> > >
> > > Cheers,
> > > Gordon
> > >
> > >
> > > >
> > > > Regards,Xiaowei
> > > >
> > > >     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> > > > aljoscha@apache.org> wrote:
> > > >
> > > >  +1 I think is is a very valuable new additional and we should try
> and
> > > not
> > > > get stuck on trying to design the perfect solution for everything
> > > >
> > > > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tzulitai@apache.org
> >
> > > > wrote:
> > > > >
> > > > > +1 to renaming it as State Processing API and adding it under the
> > > > > flink-libraries module.
> > > > >
> > > > > I also think we can start with the development of the feature. From
> > the
> > > > > feedback so far, it seems like we're in a good spot to add in at
> > least
> > > > the
> > > > > initial version of this API, hopefully making it ready for 1.9.0.
> > > > >
> > > > > Cheers,
> > > > > Gordon
> > > > >
> > > > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sj...@gmail.com>
> > > wrote:
> > > > >
> > > > >> It seems like a recurring piece of feedback was a different name.
> > I’d
> > > > like
> > > > >> to propose moving the functionality to the libraries module and
> > naming
> > > > this
> > > > >> the State Processing API.
> > > > >>
> > > > >> Seth
> > > > >>
> > > > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com>
> > > wrote:
> > > > >>>
> > > > >>> The SavepointOutputFormat only writes out the savepoint metadata
> > file
> > > > >> and should be mostly ignored.
> > > > >>>
> > > > >>> The actual state is written out by stream operators and tied into
> > the
> > > > >> flink runtime[1, 2, 3].
> > > > >>>
> > > > >>> This is the most important part and the piece that I don’t think
> > can
> > > be
> > > > >> reasonably extracted.
> > > > >>>
> > > > >>> Seth
> > > > >>>
> > > > >>> [1]
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> > > > >>>
> > > > >>> [2]
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> > > > >>>
> > > > >>> [3]
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> > > > >>>
> > > > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz>
> > wrote:
> > > > >>>>
> > > > >>>> Hi Seth,
> > > > >>>>
> > > > >>>> yes, that helped! :-)
> > > > >>>>
> > > > >>>> What I was looking for is essentially
> > > > >>
> > `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`.
> > > It
> > > > >> would be great if this could be written in a way, that would
> enable
> > > > reusing
> > > > >> it in different engine (as I mentioned - Apache Spark). There seem
> > to
> > > be
> > > > >> some issues though. It uses interface Savepoint, which uses
> several
> > > > other
> > > > >> objects and interfaces from Flink's runtime. Maybe some
> convenience
> > > API
> > > > >> might help - Apache Beam, handles operator naming, so that
> > definitely
> > > > >> should be transitionable between systems, but I'm not sure, how to
> > > > >> construct OperatorID from this name. Would you think, that it is
> > > > possible
> > > > >> to come up with something that could be used in this portable way?
> > > > >>>>
> > > > >>>> I understand, there are some more conditions, that need to be
> > > > satisfied
> > > > >> (grouping, aggregating, ...), which would of course have to be
> > handled
> > > > by
> > > > >> the target system. But Apache Beam can help leverage that. My idea
> > > would
> > > > >> be, that there can be runner specified PTransform, that takes
> > > > PCollection
> > > > >> of some tuples of `(operator name, key, state name, value1),
> > (operator
> > > > >> name, key, state name, value2)`, and Runner's responsibility would
> > be
> > > to
> > > > >> group/aggregate this so that it can be written by runner's
> provided
> > > > writer
> > > > >> (output format).
> > > > >>>>
> > > > >>>> All of this would need a lot more design, these are just ideas
> of
> > > > "what
> > > > >> could be possible", I was just wondering if this FLIP can make
> some
> > > > first
> > > > >> steps towards this.
> > > > >>>>
> > > > >>>> Many thanks for comments,
> > > > >>>>
> > > > >>>> Jan
> > > > >>>>
> > > > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> > > > >>>>> @Jan Gotcha,
> > > > >>>>>
> > > > >>>>> So in reusing components it explicitly is not a writer. This is
> > > not a
> > > > >> savepoint output format in the way we have a parquet output
> format.
> > > The
> > > > >> reason for the Transform api is to hide the underlying details, it
> > > does
> > > > not
> > > > >> simply append a output writer to the end of a dataset. This gets
> > into
> > > > the
> > > > >> implementation details but at a high level, the dataset is:
> > > > >>>>>
> > > > >>>>> 1) partitioned using key groups
> > > > >>>>> 2) data is run through a standard stream operator that takes a
> > > > >> snapshot of its state after processing all records and outputs
> > > metadata
> > > > >> handles for each subtask
> > > > >>>>> 3) those metadata handles are aggregated down to a single
> > savepoint
> > > > >> handle
> > > > >>>>> 4) that handle is written out as a final metadata file
> > > > >>>>>
> > > > >>>>> What’s important here is that the api actually depends on the
> > data
> > > > >> flow collection and state is written out as a side effect of
> taking
> > a
> > > > >> savepoint. The FLIP describes a lose coupling to the dataset api
> for
> > > > >> eventual migration to BoundedStream, that is true. However, the
> api
> > > does
> > > > >> require knowing what concrete data flow is being used to perform
> > these
> > > > >> re-partitionings  and post aggregations.
> > > > >>>>>
> > > > >>>>> I’m linking to my prototype implementation, particularly what
> > > > actually
> > > > >> happens when you call write and run the transformations. Let me
> know
> > > if
> > > > >> that helps clarify.
> > > > >>>>>
> > > > >>>>> Seth
> > > > >>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz>
> > > wrote:
> > > > >>>>>>
> > > > >>>>>> Hi Seth,
> > > > >>>>>>
> > > > >>>>>> that sounds reasonable. What I was asking for was not to
> reverse
> > > > >> engineer binary format, but to make the savepoint write API a
> little
> > > > more
> > > > >> reusable, so that it could be wrapped into some other technology.
> I
> > > > don't
> > > > >> know the details enough to propose a solution, but it seems to me,
> > > that
> > > > it
> > > > >> could be possible to use something like Writer instead of
> Transform.
> > > Or
> > > > >> maybe the Transform can use the Writer internally, the goal is
> just
> > to
> > > > >> enable to create the savepoint from "'outside" of Flink (with some
> > > > library,
> > > > >> of course).
> > > > >>>>>>
> > > > >>>>>> Jan
> > > > >>>>>>
> > > > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> > > > >>>>>>> @Konstantin agreed, that was a large impotence for this
> > feature.
> > > > >> Also I am happy to change the name to something that better
> > describes
> > > > the
> > > > >> feature set.
> > > > >>>>>>>
> > > > >>>>>>> @Lan
> > > > >>>>>>>
> > > > >>>>>>> Savepoints depend heavily on a number of flink internal
> > > components
> > > > >> that may change between versions: state backends internals, type
> > > > >> serializers, the specific hash function used to turn a UID into an
> > > > >> OperatorID, etc. I consider it a feature of this proposal that the
> > > > library
> > > > >> depends on those internal components instead of reverse
> engineering
> > > the
> > > > >> binary format. This way as those internals change, or new state
> > > features
> > > > >> are added (think the recent addition of TTL) we will get support
> > > > >> automatically. I do not believe anything else is maintainable.
> > > > >>>>>>>
> > > > >>>>>>> Seth
> > > > >>>>>>>
> > > > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> > > > wrote:
> > > > >>>>>>>>
> > > > >>>>>>>> Hi,
> > > > >>>>>>>>
> > > > >>>>>>>> this is awesome, and really useful feature. If I might ask
> for
> > > one
> > > > >> thing to consider - would it be possible to make the Savepoint
> > > > manipulation
> > > > >> API (at least writing the Savepoint) less dependent on other parts
> > of
> > > > Flink
> > > > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide
> something
> > > > more
> > > > >> general (e.g. some generic Writer)? Why I'm asking for that - I
> can
> > > > totally
> > > > >> imagine situation, where users might want to create bootstrapped
> > state
> > > > by
> > > > >> some other runner (e.g. Apache Spark), and then run Apache Flink
> > after
> > > > the
> > > > >> state has been created. This makes even more sense in context of
> > > Apache
> > > > >> Beam, which provides all the necessary work to make this happen.
> The
> > > > >> question is - would it be possible to design this feature so that
> > > > writing
> > > > >> the savepoint from different runner would be possible?
> > > > >>>>>>>>
> > > > >>>>>>>> Cheers,
> > > > >>>>>>>>
> > > > >>>>>>>> Jan
> > > > >>>>>>>>
> > > > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> > > > >>>>>>>>> Hey Everyone!
> > > > >>>>>>>>>
> > > > >>>>>>>>> Gordon and I have been discussing adding a savepoint
> > connector
> > > to
> > > > >> flink for reading, writing and modifying savepoints.
> > > > >>>>>>>>>
> > > > >>>>>>>>> This is useful for:
> > > > >>>>>>>>>
> > > > >>>>>>>>>  Analyzing state for interesting patterns
> > > > >>>>>>>>>  Troubleshooting or auditing jobs by checking for
> > discrepancies
> > > > >> in state
> > > > >>>>>>>>>  Bootstrapping state for new applications
> > > > >>>>>>>>>  Modifying savepoints such as:
> > > > >>>>>>>>>      Changing max parallelism
> > > > >>>>>>>>>      Making breaking schema changes
> > > > >>>>>>>>>      Correcting invalid state
> > > > >>>>>>>>>
> > > > >>>>>>>>> We are looking forward to your feedback!
> > > > >>>>>>>>>
> > > > >>>>>>>>> This is the FLIP:
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> > > > >>>>>>>>>
> > > > >>>>>>>>> Seth
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>
> > > >
> > >
> >
>

Re: [Discuss] FLIP-43: Savepoint Connector

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Jark,

Thanks for the reminder. I've updated the FLIP name in confluence to match
the new name "State Processor API".

Concerning an API for changing max parallelism:
That is actually in the works and has been considered, and would look
something like -
```
ExistingSavepoint savepoint = Savepoint.load(oldPath);
savepoint.modifyMaxParallelism(newParallelism, newPath);
```

For the contribution, we've currently reached the write-part of the State
processing API [1].
After that is settled, we can then start thinking about adding this max
parallelism change feature as a follow-up.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/8861

On Tue, Jun 25, 2019 at 2:27 PM Jark Wu <im...@gmail.com> wrote:

> Thanks for the awesome FLIP.
>
> I think it will be very useful in state migration scenario. We are also
> looking for a state reuse solution for SQL jobs. And I think this feature
> will help a lot.
> Looking forward to have it in the near future.
>
> Regarding to the naming, I'm +1 to "State Processing API". Should we also
> update the FLIP name in confluence?
>
> Btw, what do you think to add a shortcut API for changing max parallelism
> for savepoints? This is a very common scenario.
> But from my understanding, it needs to do a lot of trivial thing to achieve
> it under current API.
>
> Best,
> Jark
>
>
> On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
> > On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xi...@gmail.com> wrote:
> >
> > >  Hi Gordon & Seth, this looks like a very useful feature for analyze
> and
> > > manage states.
> > > I agree that using DataSet is probably the most practical choice right
> > > now. But in the longer adding the TableAPI support for this will be
> nice.
> > >
> >
> > Agreed. Migrating this API in the future to the TableAPI is definitely
> > something we have considered.
> >
> >
> > > When analyzing the savepoint, I assume that the state backend restores
> > the
> > > state first? This approach is generic and works for any state backend.
> >
> >
> > Yes, that is correct. The process of reading state in snapshots is
> > currently:
> > 1) the metadata file is read when creating the input splits for the
> > InputFormat. Each input split is assigned operator state and key-group
> > state handles.
> > 2) For each input split, a state backend is launched and is restored with
> > all state of the assigned state handles. Only partially some state is
> > transformed into DataSets (using the savepoint.read*State(...) methods).
> >
> >
> > > However, sometimes it may be more efficient to directly analyzing the
> > > files on DFS without copying. We can probably add interface to allow
> > state
> > > backend optimize such behavior in the future.
> >
> >
> > That sounds like an interesting direction, though at the moment it may
> only
> > make sense for full savepoints / checkpoints.
> > One blocker for enabling this, is having the type information of state
> > available in the snapshot metadata file so that schema / type of state is
> > known before actually reading state.
> > Making state schema / type information available in the metadata file is
> > already a recurring discussion in this thread that would be useful for
> not
> > only this feature you mentioned, but also for features like SQL
> integration
> > in the future.
> > Therefore, this seems to be a reasonable next step when extending on top
> of
> > the initial scope of the API proposed in the FLIP.
> >
> >
> > > Also a quick question on the example in wiki: DataSet<MyPojo>
> keyedState
> > =
> > > operator.readKeyedState("uid", new ReaderFunction());Should
> > > operator.readKeyedState  be replaced with savepoint.readKeyedState
> here?
> > >
> >
> > Correct, this is indeed a typo. I've corrected this in the FLIP.
> >
> > Cheers,
> > Gordon
> >
> >
> > >
> > > Regards,Xiaowei
> > >
> > >     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> > > aljoscha@apache.org> wrote:
> > >
> > >  +1 I think is is a very valuable new additional and we should try and
> > not
> > > get stuck on trying to design the perfect solution for everything
> > >
> > > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tz...@apache.org>
> > > wrote:
> > > >
> > > > +1 to renaming it as State Processing API and adding it under the
> > > > flink-libraries module.
> > > >
> > > > I also think we can start with the development of the feature. From
> the
> > > > feedback so far, it seems like we're in a good spot to add in at
> least
> > > the
> > > > initial version of this API, hopefully making it ready for 1.9.0.
> > > >
> > > > Cheers,
> > > > Gordon
> > > >
> > > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sj...@gmail.com>
> > wrote:
> > > >
> > > >> It seems like a recurring piece of feedback was a different name.
> I’d
> > > like
> > > >> to propose moving the functionality to the libraries module and
> naming
> > > this
> > > >> the State Processing API.
> > > >>
> > > >> Seth
> > > >>
> > > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com>
> > wrote:
> > > >>>
> > > >>> The SavepointOutputFormat only writes out the savepoint metadata
> file
> > > >> and should be mostly ignored.
> > > >>>
> > > >>> The actual state is written out by stream operators and tied into
> the
> > > >> flink runtime[1, 2, 3].
> > > >>>
> > > >>> This is the most important part and the piece that I don’t think
> can
> > be
> > > >> reasonably extracted.
> > > >>>
> > > >>> Seth
> > > >>>
> > > >>> [1]
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> > > >>>
> > > >>> [2]
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> > > >>>
> > > >>> [3]
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> > > >>>
> > > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz>
> wrote:
> > > >>>>
> > > >>>> Hi Seth,
> > > >>>>
> > > >>>> yes, that helped! :-)
> > > >>>>
> > > >>>> What I was looking for is essentially
> > > >>
> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`.
> > It
> > > >> would be great if this could be written in a way, that would enable
> > > reusing
> > > >> it in different engine (as I mentioned - Apache Spark). There seem
> to
> > be
> > > >> some issues though. It uses interface Savepoint, which uses several
> > > other
> > > >> objects and interfaces from Flink's runtime. Maybe some convenience
> > API
> > > >> might help - Apache Beam, handles operator naming, so that
> definitely
> > > >> should be transitionable between systems, but I'm not sure, how to
> > > >> construct OperatorID from this name. Would you think, that it is
> > > possible
> > > >> to come up with something that could be used in this portable way?
> > > >>>>
> > > >>>> I understand, there are some more conditions, that need to be
> > > satisfied
> > > >> (grouping, aggregating, ...), which would of course have to be
> handled
> > > by
> > > >> the target system. But Apache Beam can help leverage that. My idea
> > would
> > > >> be, that there can be runner specified PTransform, that takes
> > > PCollection
> > > >> of some tuples of `(operator name, key, state name, value1),
> (operator
> > > >> name, key, state name, value2)`, and Runner's responsibility would
> be
> > to
> > > >> group/aggregate this so that it can be written by runner's provided
> > > writer
> > > >> (output format).
> > > >>>>
> > > >>>> All of this would need a lot more design, these are just ideas of
> > > "what
> > > >> could be possible", I was just wondering if this FLIP can make some
> > > first
> > > >> steps towards this.
> > > >>>>
> > > >>>> Many thanks for comments,
> > > >>>>
> > > >>>> Jan
> > > >>>>
> > > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> > > >>>>> @Jan Gotcha,
> > > >>>>>
> > > >>>>> So in reusing components it explicitly is not a writer. This is
> > not a
> > > >> savepoint output format in the way we have a parquet output format.
> > The
> > > >> reason for the Transform api is to hide the underlying details, it
> > does
> > > not
> > > >> simply append a output writer to the end of a dataset. This gets
> into
> > > the
> > > >> implementation details but at a high level, the dataset is:
> > > >>>>>
> > > >>>>> 1) partitioned using key groups
> > > >>>>> 2) data is run through a standard stream operator that takes a
> > > >> snapshot of its state after processing all records and outputs
> > metadata
> > > >> handles for each subtask
> > > >>>>> 3) those metadata handles are aggregated down to a single
> savepoint
> > > >> handle
> > > >>>>> 4) that handle is written out as a final metadata file
> > > >>>>>
> > > >>>>> What’s important here is that the api actually depends on the
> data
> > > >> flow collection and state is written out as a side effect of taking
> a
> > > >> savepoint. The FLIP describes a lose coupling to the dataset api for
> > > >> eventual migration to BoundedStream, that is true. However, the api
> > does
> > > >> require knowing what concrete data flow is being used to perform
> these
> > > >> re-partitionings  and post aggregations.
> > > >>>>>
> > > >>>>> I’m linking to my prototype implementation, particularly what
> > > actually
> > > >> happens when you call write and run the transformations. Let me know
> > if
> > > >> that helps clarify.
> > > >>>>>
> > > >>>>> Seth
> > > >>>>>
> > > >>>>>
> > > >>
> > >
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz>
> > wrote:
> > > >>>>>>
> > > >>>>>> Hi Seth,
> > > >>>>>>
> > > >>>>>> that sounds reasonable. What I was asking for was not to reverse
> > > >> engineer binary format, but to make the savepoint write API a little
> > > more
> > > >> reusable, so that it could be wrapped into some other technology. I
> > > don't
> > > >> know the details enough to propose a solution, but it seems to me,
> > that
> > > it
> > > >> could be possible to use something like Writer instead of Transform.
> > Or
> > > >> maybe the Transform can use the Writer internally, the goal is just
> to
> > > >> enable to create the savepoint from "'outside" of Flink (with some
> > > library,
> > > >> of course).
> > > >>>>>>
> > > >>>>>> Jan
> > > >>>>>>
> > > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> > > >>>>>>> @Konstantin agreed, that was a large impotence for this
> feature.
> > > >> Also I am happy to change the name to something that better
> describes
> > > the
> > > >> feature set.
> > > >>>>>>>
> > > >>>>>>> @Lan
> > > >>>>>>>
> > > >>>>>>> Savepoints depend heavily on a number of flink internal
> > components
> > > >> that may change between versions: state backends internals, type
> > > >> serializers, the specific hash function used to turn a UID into an
> > > >> OperatorID, etc. I consider it a feature of this proposal that the
> > > library
> > > >> depends on those internal components instead of reverse engineering
> > the
> > > >> binary format. This way as those internals change, or new state
> > features
> > > >> are added (think the recent addition of TTL) we will get support
> > > >> automatically. I do not believe anything else is maintainable.
> > > >>>>>>>
> > > >>>>>>> Seth
> > > >>>>>>>
> > > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> > > wrote:
> > > >>>>>>>>
> > > >>>>>>>> Hi,
> > > >>>>>>>>
> > > >>>>>>>> this is awesome, and really useful feature. If I might ask for
> > one
> > > >> thing to consider - would it be possible to make the Savepoint
> > > manipulation
> > > >> API (at least writing the Savepoint) less dependent on other parts
> of
> > > Flink
> > > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something
> > > more
> > > >> general (e.g. some generic Writer)? Why I'm asking for that - I can
> > > totally
> > > >> imagine situation, where users might want to create bootstrapped
> state
> > > by
> > > >> some other runner (e.g. Apache Spark), and then run Apache Flink
> after
> > > the
> > > >> state has been created. This makes even more sense in context of
> > Apache
> > > >> Beam, which provides all the necessary work to make this happen. The
> > > >> question is - would it be possible to design this feature so that
> > > writing
> > > >> the savepoint from different runner would be possible?
> > > >>>>>>>>
> > > >>>>>>>> Cheers,
> > > >>>>>>>>
> > > >>>>>>>> Jan
> > > >>>>>>>>
> > > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> > > >>>>>>>>> Hey Everyone!
> > > >>>>>>>>>
> > > >>>>>>>>> Gordon and I have been discussing adding a savepoint
> connector
> > to
> > > >> flink for reading, writing and modifying savepoints.
> > > >>>>>>>>>
> > > >>>>>>>>> This is useful for:
> > > >>>>>>>>>
> > > >>>>>>>>>  Analyzing state for interesting patterns
> > > >>>>>>>>>  Troubleshooting or auditing jobs by checking for
> discrepancies
> > > >> in state
> > > >>>>>>>>>  Bootstrapping state for new applications
> > > >>>>>>>>>  Modifying savepoints such as:
> > > >>>>>>>>>      Changing max parallelism
> > > >>>>>>>>>      Making breaking schema changes
> > > >>>>>>>>>      Correcting invalid state
> > > >>>>>>>>>
> > > >>>>>>>>> We are looking forward to your feedback!
> > > >>>>>>>>>
> > > >>>>>>>>> This is the FLIP:
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> > > >>>>>>>>>
> > > >>>>>>>>> Seth
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>
> > >
> >
>

Re: [Discuss] FLIP-43: Savepoint Connector

Posted by Jark Wu <im...@gmail.com>.
Thanks for the awesome FLIP.

I think it will be very useful in state migration scenario. We are also
looking for a state reuse solution for SQL jobs. And I think this feature
will help a lot.
Looking forward to have it in the near future.

Regarding to the naming, I'm +1 to "State Processing API". Should we also
update the FLIP name in confluence?

Btw, what do you think to add a shortcut API for changing max parallelism
for savepoints? This is a very common scenario.
But from my understanding, it needs to do a lot of trivial thing to achieve
it under current API.

Best,
Jark


On Wed, 5 Jun 2019 at 10:52, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xi...@gmail.com> wrote:
>
> >  Hi Gordon & Seth, this looks like a very useful feature for analyze and
> > manage states.
> > I agree that using DataSet is probably the most practical choice right
> > now. But in the longer adding the TableAPI support for this will be nice.
> >
>
> Agreed. Migrating this API in the future to the TableAPI is definitely
> something we have considered.
>
>
> > When analyzing the savepoint, I assume that the state backend restores
> the
> > state first? This approach is generic and works for any state backend.
>
>
> Yes, that is correct. The process of reading state in snapshots is
> currently:
> 1) the metadata file is read when creating the input splits for the
> InputFormat. Each input split is assigned operator state and key-group
> state handles.
> 2) For each input split, a state backend is launched and is restored with
> all state of the assigned state handles. Only partially some state is
> transformed into DataSets (using the savepoint.read*State(...) methods).
>
>
> > However, sometimes it may be more efficient to directly analyzing the
> > files on DFS without copying. We can probably add interface to allow
> state
> > backend optimize such behavior in the future.
>
>
> That sounds like an interesting direction, though at the moment it may only
> make sense for full savepoints / checkpoints.
> One blocker for enabling this, is having the type information of state
> available in the snapshot metadata file so that schema / type of state is
> known before actually reading state.
> Making state schema / type information available in the metadata file is
> already a recurring discussion in this thread that would be useful for not
> only this feature you mentioned, but also for features like SQL integration
> in the future.
> Therefore, this seems to be a reasonable next step when extending on top of
> the initial scope of the API proposed in the FLIP.
>
>
> > Also a quick question on the example in wiki: DataSet<MyPojo> keyedState
> =
> > operator.readKeyedState("uid", new ReaderFunction());Should
> > operator.readKeyedState  be replaced with savepoint.readKeyedState here?
> >
>
> Correct, this is indeed a typo. I've corrected this in the FLIP.
>
> Cheers,
> Gordon
>
>
> >
> > Regards,Xiaowei
> >
> >     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> > aljoscha@apache.org> wrote:
> >
> >  +1 I think is is a very valuable new additional and we should try and
> not
> > get stuck on trying to design the perfect solution for everything
> >
> > > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tz...@apache.org>
> > wrote:
> > >
> > > +1 to renaming it as State Processing API and adding it under the
> > > flink-libraries module.
> > >
> > > I also think we can start with the development of the feature. From the
> > > feedback so far, it seems like we're in a good spot to add in at least
> > the
> > > initial version of this API, hopefully making it ready for 1.9.0.
> > >
> > > Cheers,
> > > Gordon
> > >
> > > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sj...@gmail.com>
> wrote:
> > >
> > >> It seems like a recurring piece of feedback was a different name. I’d
> > like
> > >> to propose moving the functionality to the libraries module and naming
> > this
> > >> the State Processing API.
> > >>
> > >> Seth
> > >>
> > >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com>
> wrote:
> > >>>
> > >>> The SavepointOutputFormat only writes out the savepoint metadata file
> > >> and should be mostly ignored.
> > >>>
> > >>> The actual state is written out by stream operators and tied into the
> > >> flink runtime[1, 2, 3].
> > >>>
> > >>> This is the most important part and the piece that I don’t think can
> be
> > >> reasonably extracted.
> > >>>
> > >>> Seth
> > >>>
> > >>> [1]
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> > >>>
> > >>> [2]
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> > >>>
> > >>> [3]
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> > >>>
> > >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
> > >>>>
> > >>>> Hi Seth,
> > >>>>
> > >>>> yes, that helped! :-)
> > >>>>
> > >>>> What I was looking for is essentially
> > >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`.
> It
> > >> would be great if this could be written in a way, that would enable
> > reusing
> > >> it in different engine (as I mentioned - Apache Spark). There seem to
> be
> > >> some issues though. It uses interface Savepoint, which uses several
> > other
> > >> objects and interfaces from Flink's runtime. Maybe some convenience
> API
> > >> might help - Apache Beam, handles operator naming, so that definitely
> > >> should be transitionable between systems, but I'm not sure, how to
> > >> construct OperatorID from this name. Would you think, that it is
> > possible
> > >> to come up with something that could be used in this portable way?
> > >>>>
> > >>>> I understand, there are some more conditions, that need to be
> > satisfied
> > >> (grouping, aggregating, ...), which would of course have to be handled
> > by
> > >> the target system. But Apache Beam can help leverage that. My idea
> would
> > >> be, that there can be runner specified PTransform, that takes
> > PCollection
> > >> of some tuples of `(operator name, key, state name, value1), (operator
> > >> name, key, state name, value2)`, and Runner's responsibility would be
> to
> > >> group/aggregate this so that it can be written by runner's provided
> > writer
> > >> (output format).
> > >>>>
> > >>>> All of this would need a lot more design, these are just ideas of
> > "what
> > >> could be possible", I was just wondering if this FLIP can make some
> > first
> > >> steps towards this.
> > >>>>
> > >>>> Many thanks for comments,
> > >>>>
> > >>>> Jan
> > >>>>
> > >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> > >>>>> @Jan Gotcha,
> > >>>>>
> > >>>>> So in reusing components it explicitly is not a writer. This is
> not a
> > >> savepoint output format in the way we have a parquet output format.
> The
> > >> reason for the Transform api is to hide the underlying details, it
> does
> > not
> > >> simply append a output writer to the end of a dataset. This gets into
> > the
> > >> implementation details but at a high level, the dataset is:
> > >>>>>
> > >>>>> 1) partitioned using key groups
> > >>>>> 2) data is run through a standard stream operator that takes a
> > >> snapshot of its state after processing all records and outputs
> metadata
> > >> handles for each subtask
> > >>>>> 3) those metadata handles are aggregated down to a single savepoint
> > >> handle
> > >>>>> 4) that handle is written out as a final metadata file
> > >>>>>
> > >>>>> What’s important here is that the api actually depends on the data
> > >> flow collection and state is written out as a side effect of taking a
> > >> savepoint. The FLIP describes a lose coupling to the dataset api for
> > >> eventual migration to BoundedStream, that is true. However, the api
> does
> > >> require knowing what concrete data flow is being used to perform these
> > >> re-partitionings  and post aggregations.
> > >>>>>
> > >>>>> I’m linking to my prototype implementation, particularly what
> > actually
> > >> happens when you call write and run the transformations. Let me know
> if
> > >> that helps clarify.
> > >>>>>
> > >>>>> Seth
> > >>>>>
> > >>>>>
> > >>
> >
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz>
> wrote:
> > >>>>>>
> > >>>>>> Hi Seth,
> > >>>>>>
> > >>>>>> that sounds reasonable. What I was asking for was not to reverse
> > >> engineer binary format, but to make the savepoint write API a little
> > more
> > >> reusable, so that it could be wrapped into some other technology. I
> > don't
> > >> know the details enough to propose a solution, but it seems to me,
> that
> > it
> > >> could be possible to use something like Writer instead of Transform.
> Or
> > >> maybe the Transform can use the Writer internally, the goal is just to
> > >> enable to create the savepoint from "'outside" of Flink (with some
> > library,
> > >> of course).
> > >>>>>>
> > >>>>>> Jan
> > >>>>>>
> > >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> > >>>>>>> @Konstantin agreed, that was a large impotence for this feature.
> > >> Also I am happy to change the name to something that better  describes
> > the
> > >> feature set.
> > >>>>>>>
> > >>>>>>> @Lan
> > >>>>>>>
> > >>>>>>> Savepoints depend heavily on a number of flink internal
> components
> > >> that may change between versions: state backends internals, type
> > >> serializers, the specific hash function used to turn a UID into an
> > >> OperatorID, etc. I consider it a feature of this proposal that the
> > library
> > >> depends on those internal components instead of reverse engineering
> the
> > >> binary format. This way as those internals change, or new state
> features
> > >> are added (think the recent addition of TTL) we will get support
> > >> automatically. I do not believe anything else is maintainable.
> > >>>>>>>
> > >>>>>>> Seth
> > >>>>>>>
> > >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> > wrote:
> > >>>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>>
> > >>>>>>>> this is awesome, and really useful feature. If I might ask for
> one
> > >> thing to consider - would it be possible to make the Savepoint
> > manipulation
> > >> API (at least writing the Savepoint) less dependent on other parts of
> > Flink
> > >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something
> > more
> > >> general (e.g. some generic Writer)? Why I'm asking for that - I can
> > totally
> > >> imagine situation, where users might want to create bootstrapped state
> > by
> > >> some other runner (e.g. Apache Spark), and then run Apache Flink after
> > the
> > >> state has been created. This makes even more sense in context of
> Apache
> > >> Beam, which provides all the necessary work to make this happen. The
> > >> question is - would it be possible to design this feature so that
> > writing
> > >> the savepoint from different runner would be possible?
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>>
> > >>>>>>>> Jan
> > >>>>>>>>
> > >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> > >>>>>>>>> Hey Everyone!
> > >>>>>>>>>
> > >>>>>>>>> Gordon and I have been discussing adding a savepoint connector
> to
> > >> flink for reading, writing and modifying savepoints.
> > >>>>>>>>>
> > >>>>>>>>> This is useful for:
> > >>>>>>>>>
> > >>>>>>>>>  Analyzing state for interesting patterns
> > >>>>>>>>>  Troubleshooting or auditing jobs by checking for discrepancies
> > >> in state
> > >>>>>>>>>  Bootstrapping state for new applications
> > >>>>>>>>>  Modifying savepoints such as:
> > >>>>>>>>>      Changing max parallelism
> > >>>>>>>>>      Making breaking schema changes
> > >>>>>>>>>      Correcting invalid state
> > >>>>>>>>>
> > >>>>>>>>> We are looking forward to your feedback!
> > >>>>>>>>>
> > >>>>>>>>> This is the FLIP:
> > >>>>>>>>>
> > >>>>>>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> > >>>>>>>>>
> > >>>>>>>>> Seth
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>
> >
>

Re: [Discuss] FLIP-43: Savepoint Connector

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
On Wed, Jun 5, 2019 at 6:39 AM Xiaowei Jiang <xi...@gmail.com> wrote:

>  Hi Gordon & Seth, this looks like a very useful feature for analyze and
> manage states.
> I agree that using DataSet is probably the most practical choice right
> now. But in the longer adding the TableAPI support for this will be nice.
>

Agreed. Migrating this API in the future to the TableAPI is definitely
something we have considered.


> When analyzing the savepoint, I assume that the state backend restores the
> state first? This approach is generic and works for any state backend.


Yes, that is correct. The process of reading state in snapshots is
currently:
1) the metadata file is read when creating the input splits for the
InputFormat. Each input split is assigned operator state and key-group
state handles.
2) For each input split, a state backend is launched and is restored with
all state of the assigned state handles. Only partially some state is
transformed into DataSets (using the savepoint.read*State(...) methods).


> However, sometimes it may be more efficient to directly analyzing the
> files on DFS without copying. We can probably add interface to allow state
> backend optimize such behavior in the future.


That sounds like an interesting direction, though at the moment it may only
make sense for full savepoints / checkpoints.
One blocker for enabling this, is having the type information of state
available in the snapshot metadata file so that schema / type of state is
known before actually reading state.
Making state schema / type information available in the metadata file is
already a recurring discussion in this thread that would be useful for not
only this feature you mentioned, but also for features like SQL integration
in the future.
Therefore, this seems to be a reasonable next step when extending on top of
the initial scope of the API proposed in the FLIP.


> Also a quick question on the example in wiki: DataSet<MyPojo> keyedState =
> operator.readKeyedState("uid", new ReaderFunction());Should
> operator.readKeyedState  be replaced with savepoint.readKeyedState here?
>

Correct, this is indeed a typo. I've corrected this in the FLIP.

Cheers,
Gordon


>
> Regards,Xiaowei
>
>     On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <
> aljoscha@apache.org> wrote:
>
>  +1 I think is is a very valuable new additional and we should try and not
> get stuck on trying to design the perfect solution for everything
>
> > On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
> >
> > +1 to renaming it as State Processing API and adding it under the
> > flink-libraries module.
> >
> > I also think we can start with the development of the feature. From the
> > feedback so far, it seems like we're in a good spot to add in at least
> the
> > initial version of this API, hopefully making it ready for 1.9.0.
> >
> > Cheers,
> > Gordon
> >
> > On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sj...@gmail.com> wrote:
> >
> >> It seems like a recurring piece of feedback was a different name. I’d
> like
> >> to propose moving the functionality to the libraries module and naming
> this
> >> the State Processing API.
> >>
> >> Seth
> >>
> >>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com> wrote:
> >>>
> >>> The SavepointOutputFormat only writes out the savepoint metadata file
> >> and should be mostly ignored.
> >>>
> >>> The actual state is written out by stream operators and tied into the
> >> flink runtime[1, 2, 3].
> >>>
> >>> This is the most important part and the piece that I don’t think can be
> >> reasonably extracted.
> >>>
> >>> Seth
> >>>
> >>> [1]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> >>>
> >>> [2]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> >>>
> >>> [3]
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> >>>
> >>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
> >>>>
> >>>> Hi Seth,
> >>>>
> >>>> yes, that helped! :-)
> >>>>
> >>>> What I was looking for is essentially
> >> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
> >> would be great if this could be written in a way, that would enable
> reusing
> >> it in different engine (as I mentioned - Apache Spark). There seem to be
> >> some issues though. It uses interface Savepoint, which uses several
> other
> >> objects and interfaces from Flink's runtime. Maybe some convenience API
> >> might help - Apache Beam, handles operator naming, so that definitely
> >> should be transitionable between systems, but I'm not sure, how to
> >> construct OperatorID from this name. Would you think, that it is
> possible
> >> to come up with something that could be used in this portable way?
> >>>>
> >>>> I understand, there are some more conditions, that need to be
> satisfied
> >> (grouping, aggregating, ...), which would of course have to be handled
> by
> >> the target system. But Apache Beam can help leverage that. My idea would
> >> be, that there can be runner specified PTransform, that takes
> PCollection
> >> of some tuples of `(operator name, key, state name, value1), (operator
> >> name, key, state name, value2)`, and Runner's responsibility would be to
> >> group/aggregate this so that it can be written by runner's provided
> writer
> >> (output format).
> >>>>
> >>>> All of this would need a lot more design, these are just ideas of
> "what
> >> could be possible", I was just wondering if this FLIP can make some
> first
> >> steps towards this.
> >>>>
> >>>> Many thanks for comments,
> >>>>
> >>>> Jan
> >>>>
> >>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> >>>>> @Jan Gotcha,
> >>>>>
> >>>>> So in reusing components it explicitly is not a writer. This is not a
> >> savepoint output format in the way we have a parquet output format. The
> >> reason for the Transform api is to hide the underlying details, it does
> not
> >> simply append a output writer to the end of a dataset. This gets into
> the
> >> implementation details but at a high level, the dataset is:
> >>>>>
> >>>>> 1) partitioned using key groups
> >>>>> 2) data is run through a standard stream operator that takes a
> >> snapshot of its state after processing all records and outputs metadata
> >> handles for each subtask
> >>>>> 3) those metadata handles are aggregated down to a single savepoint
> >> handle
> >>>>> 4) that handle is written out as a final metadata file
> >>>>>
> >>>>> What’s important here is that the api actually depends on the data
> >> flow collection and state is written out as a side effect of taking a
> >> savepoint. The FLIP describes a lose coupling to the dataset api for
> >> eventual migration to BoundedStream, that is true. However, the api does
> >> require knowing what concrete data flow is being used to perform these
> >> re-partitionings  and post aggregations.
> >>>>>
> >>>>> I’m linking to my prototype implementation, particularly what
> actually
> >> happens when you call write and run the transformations. Let me know if
> >> that helps clarify.
> >>>>>
> >>>>> Seth
> >>>>>
> >>>>>
> >>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
> >>>>>>
> >>>>>> Hi Seth,
> >>>>>>
> >>>>>> that sounds reasonable. What I was asking for was not to reverse
> >> engineer binary format, but to make the savepoint write API a little
> more
> >> reusable, so that it could be wrapped into some other technology. I
> don't
> >> know the details enough to propose a solution, but it seems to me, that
> it
> >> could be possible to use something like Writer instead of Transform. Or
> >> maybe the Transform can use the Writer internally, the goal is just to
> >> enable to create the savepoint from "'outside" of Flink (with some
> library,
> >> of course).
> >>>>>>
> >>>>>> Jan
> >>>>>>
> >>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> >>>>>>> @Konstantin agreed, that was a large impotence for this feature.
> >> Also I am happy to change the name to something that better  describes
> the
> >> feature set.
> >>>>>>>
> >>>>>>> @Lan
> >>>>>>>
> >>>>>>> Savepoints depend heavily on a number of flink internal components
> >> that may change between versions: state backends internals, type
> >> serializers, the specific hash function used to turn a UID into an
> >> OperatorID, etc. I consider it a feature of this proposal that the
> library
> >> depends on those internal components instead of reverse engineering the
> >> binary format. This way as those internals change, or new state features
> >> are added (think the recent addition of TTL) we will get support
> >> automatically. I do not believe anything else is maintainable.
> >>>>>>>
> >>>>>>> Seth
> >>>>>>>
> >>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> this is awesome, and really useful feature. If I might ask for one
> >> thing to consider - would it be possible to make the Savepoint
> manipulation
> >> API (at least writing the Savepoint) less dependent on other parts of
> Flink
> >> internals (e.g. |KeyedStateBootstrapFunction|) and provide something
> more
> >> general (e.g. some generic Writer)? Why I'm asking for that - I can
> totally
> >> imagine situation, where users might want to create bootstrapped state
> by
> >> some other runner (e.g. Apache Spark), and then run Apache Flink after
> the
> >> state has been created. This makes even more sense in context of Apache
> >> Beam, which provides all the necessary work to make this happen. The
> >> question is - would it be possible to design this feature so that
> writing
> >> the savepoint from different runner would be possible?
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>>
> >>>>>>>> Jan
> >>>>>>>>
> >>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> >>>>>>>>> Hey Everyone!
> >>>>>>>>>
> >>>>>>>>> Gordon and I have been discussing adding a savepoint connector to
> >> flink for reading, writing and modifying savepoints.
> >>>>>>>>>
> >>>>>>>>> This is useful for:
> >>>>>>>>>
> >>>>>>>>>  Analyzing state for interesting patterns
> >>>>>>>>>  Troubleshooting or auditing jobs by checking for discrepancies
> >> in state
> >>>>>>>>>  Bootstrapping state for new applications
> >>>>>>>>>  Modifying savepoints such as:
> >>>>>>>>>      Changing max parallelism
> >>>>>>>>>      Making breaking schema changes
> >>>>>>>>>      Correcting invalid state
> >>>>>>>>>
> >>>>>>>>> We are looking forward to your feedback!
> >>>>>>>>>
> >>>>>>>>> This is the FLIP:
> >>>>>>>>>
> >>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> >>>>>>>>>
> >>>>>>>>> Seth
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>
>

Re: [Discuss] FLIP-43: Savepoint Connector

Posted by Xiaowei Jiang <xi...@gmail.com>.
 Hi Gordon & Seth, this looks like a very useful feature for analyze and manage states. 
I agree that using DataSet is probably the most practical choice right now. But in the longer adding the TableAPI support for this will be nice.
When analyzing the savepoint, I assume that the state backend restores the state first? This approach is generic and works for any state backend. However, sometimes it may be more efficient to directly analyzing the files on DFS without copying. We can probably add interface to allow state backend optimize such behavior in the future.
Also a quick question on the example in wiki: DataSet<MyPojo> keyedState = operator.readKeyedState("uid", new ReaderFunction());Should operator.readKeyedState  be replaced with savepoint.readKeyedState here?

Regards,Xiaowei

    On Tuesday, June 4, 2019, 6:56:00 AM PDT, Aljoscha Krettek <al...@apache.org> wrote:  
 
 +1 I think is is a very valuable new additional and we should try and not get stuck on trying to design the perfect solution for everything

> On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
> 
> +1 to renaming it as State Processing API and adding it under the
> flink-libraries module.
> 
> I also think we can start with the development of the feature. From the
> feedback so far, it seems like we're in a good spot to add in at least the
> initial version of this API, hopefully making it ready for 1.9.0.
> 
> Cheers,
> Gordon
> 
> On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sj...@gmail.com> wrote:
> 
>> It seems like a recurring piece of feedback was a different name. I’d like
>> to propose moving the functionality to the libraries module and naming this
>> the State Processing API.
>> 
>> Seth
>> 
>>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com> wrote:
>>> 
>>> The SavepointOutputFormat only writes out the savepoint metadata file
>> and should be mostly ignored.
>>> 
>>> The actual state is written out by stream operators and tied into the
>> flink runtime[1, 2, 3].
>>> 
>>> This is the most important part and the piece that I don’t think can be
>> reasonably extracted.
>>> 
>>> Seth
>>> 
>>> [1]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
>>> 
>>> [2]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
>>> 
>>> [3]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
>>> 
>>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
>>>> 
>>>> Hi Seth,
>>>> 
>>>> yes, that helped! :-)
>>>> 
>>>> What I was looking for is essentially
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
>> would be great if this could be written in a way, that would enable reusing
>> it in different engine (as I mentioned - Apache Spark). There seem to be
>> some issues though. It uses interface Savepoint, which uses several other
>> objects and interfaces from Flink's runtime. Maybe some convenience API
>> might help - Apache Beam, handles operator naming, so that definitely
>> should be transitionable between systems, but I'm not sure, how to
>> construct OperatorID from this name. Would you think, that it is possible
>> to come up with something that could be used in this portable way?
>>>> 
>>>> I understand, there are some more conditions, that need to be satisfied
>> (grouping, aggregating, ...), which would of course have to be handled by
>> the target system. But Apache Beam can help leverage that. My idea would
>> be, that there can be runner specified PTransform, that takes PCollection
>> of some tuples of `(operator name, key, state name, value1), (operator
>> name, key, state name, value2)`, and Runner's responsibility would be to
>> group/aggregate this so that it can be written by runner's provided writer
>> (output format).
>>>> 
>>>> All of this would need a lot more design, these are just ideas of "what
>> could be possible", I was just wondering if this FLIP can make some first
>> steps towards this.
>>>> 
>>>> Many thanks for comments,
>>>> 
>>>> Jan
>>>> 
>>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
>>>>> @Jan Gotcha,
>>>>> 
>>>>> So in reusing components it explicitly is not a writer. This is not a
>> savepoint output format in the way we have a parquet output format. The
>> reason for the Transform api is to hide the underlying details, it does not
>> simply append a output writer to the end of a dataset. This gets into the
>> implementation details but at a high level, the dataset is:
>>>>> 
>>>>> 1) partitioned using key groups
>>>>> 2) data is run through a standard stream operator that takes a
>> snapshot of its state after processing all records and outputs metadata
>> handles for each subtask
>>>>> 3) those metadata handles are aggregated down to a single savepoint
>> handle
>>>>> 4) that handle is written out as a final metadata file
>>>>> 
>>>>> What’s important here is that the api actually depends on the data
>> flow collection and state is written out as a side effect of taking a
>> savepoint. The FLIP describes a lose coupling to the dataset api for
>> eventual migration to BoundedStream, that is true. However, the api does
>> require knowing what concrete data flow is being used to perform these
>> re-partitionings  and post aggregations.
>>>>> 
>>>>> I’m linking to my prototype implementation, particularly what actually
>> happens when you call write and run the transformations. Let me know if
>> that helps clarify.
>>>>> 
>>>>> Seth
>>>>> 
>>>>> 
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
>>>>> 
>>>>> 
>>>>> 
>>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>> 
>>>>>> Hi Seth,
>>>>>> 
>>>>>> that sounds reasonable. What I was asking for was not to reverse
>> engineer binary format, but to make the savepoint write API a little more
>> reusable, so that it could be wrapped into some other technology. I don't
>> know the details enough to propose a solution, but it seems to me, that it
>> could be possible to use something like Writer instead of Transform. Or
>> maybe the Transform can use the Writer internally, the goal is just to
>> enable to create the savepoint from "'outside" of Flink (with some library,
>> of course).
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
>>>>>>> @Konstantin agreed, that was a large impotence for this feature.
>> Also I am happy to change the name to something that better  describes the
>> feature set.
>>>>>>> 
>>>>>>> @Lan
>>>>>>> 
>>>>>>> Savepoints depend heavily on a number of flink internal components
>> that may change between versions: state backends internals, type
>> serializers, the specific hash function used to turn a UID into an
>> OperatorID, etc. I consider it a feature of this proposal that the library
>> depends on those internal components instead of reverse engineering the
>> binary format. This way as those internals change, or new state features
>> are added (think the recent addition of TTL) we will get support
>> automatically. I do not believe anything else is maintainable.
>>>>>>> 
>>>>>>> Seth
>>>>>>> 
>>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> this is awesome, and really useful feature. If I might ask for one
>> thing to consider - would it be possible to make the Savepoint manipulation
>> API (at least writing the Savepoint) less dependent on other parts of Flink
>> internals (e.g. |KeyedStateBootstrapFunction|) and provide something more
>> general (e.g. some generic Writer)? Why I'm asking for that - I can totally
>> imagine situation, where users might want to create bootstrapped state by
>> some other runner (e.g. Apache Spark), and then run Apache Flink after the
>> state has been created. This makes even more sense in context of Apache
>> Beam, which provides all the necessary work to make this happen. The
>> question is - would it be possible to design this feature so that writing
>> the savepoint from different runner would be possible?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> 
>>>>>>>> Jan
>>>>>>>> 
>>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
>>>>>>>>> Hey Everyone!
>>>>>>>>> ​
>>>>>>>>> Gordon and I have been discussing adding a savepoint connector to
>> flink for reading, writing and modifying savepoints.
>>>>>>>>> ​
>>>>>>>>> This is useful for:
>>>>>>>>> ​
>>>>>>>>>  Analyzing state for interesting patterns
>>>>>>>>>  Troubleshooting or auditing jobs by checking for discrepancies
>> in state
>>>>>>>>>  Bootstrapping state for new applications
>>>>>>>>>  Modifying savepoints such as:
>>>>>>>>>      Changing max parallelism
>>>>>>>>>      Making breaking schema changes
>>>>>>>>>      Correcting invalid state
>>>>>>>>> ​
>>>>>>>>> We are looking forward to your feedback!
>>>>>>>>> ​
>>>>>>>>> This is the FLIP:
>>>>>>>>> ​
>>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
>>>>>>>>> 
>>>>>>>>> Seth
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>> 
  

Re: [Discuss] FLIP-43: Savepoint Connector

Posted by Aljoscha Krettek <al...@apache.org>.
+1 I think is is a very valuable new additional and we should try and not get stuck on trying to design the perfect solution for everything

> On 4. Jun 2019, at 13:24, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
> 
> +1 to renaming it as State Processing API and adding it under the
> flink-libraries module.
> 
> I also think we can start with the development of the feature. From the
> feedback so far, it seems like we're in a good spot to add in at least the
> initial version of this API, hopefully making it ready for 1.9.0.
> 
> Cheers,
> Gordon
> 
> On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sj...@gmail.com> wrote:
> 
>> It seems like a recurring piece of feedback was a different name. I’d like
>> to propose moving the functionality to the libraries module and naming this
>> the State Processing API.
>> 
>> Seth
>> 
>>> On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com> wrote:
>>> 
>>> The SavepointOutputFormat only writes out the savepoint metadata file
>> and should be mostly ignored.
>>> 
>>> The actual state is written out by stream operators and tied into the
>> flink runtime[1, 2, 3].
>>> 
>>> This is the most important part and the piece that I don’t think can be
>> reasonably extracted.
>>> 
>>> Seth
>>> 
>>> [1]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
>>> 
>>> [2]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
>>> 
>>> [3]
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
>>> 
>>>> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
>>>> 
>>>> Hi Seth,
>>>> 
>>>> yes, that helped! :-)
>>>> 
>>>> What I was looking for is essentially
>> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
>> would be great if this could be written in a way, that would enable reusing
>> it in different engine (as I mentioned - Apache Spark). There seem to be
>> some issues though. It uses interface Savepoint, which uses several other
>> objects and interfaces from Flink's runtime. Maybe some convenience API
>> might help - Apache Beam, handles operator naming, so that definitely
>> should be transitionable between systems, but I'm not sure, how to
>> construct OperatorID from this name. Would you think, that it is possible
>> to come up with something that could be used in this portable way?
>>>> 
>>>> I understand, there are some more conditions, that need to be satisfied
>> (grouping, aggregating, ...), which would of course have to be handled by
>> the target system. But Apache Beam can help leverage that. My idea would
>> be, that there can be runner specified PTransform, that takes PCollection
>> of some tuples of `(operator name, key, state name, value1), (operator
>> name, key, state name, value2)`, and Runner's responsibility would be to
>> group/aggregate this so that it can be written by runner's provided writer
>> (output format).
>>>> 
>>>> All of this would need a lot more design, these are just ideas of "what
>> could be possible", I was just wondering if this FLIP can make some first
>> steps towards this.
>>>> 
>>>> Many thanks for comments,
>>>> 
>>>> Jan
>>>> 
>>>>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
>>>>> @Jan Gotcha,
>>>>> 
>>>>> So in reusing components it explicitly is not a writer. This is not a
>> savepoint output format in the way we have a parquet output format. The
>> reason for the Transform api is to hide the underlying details, it does not
>> simply append a output writer to the end of a dataset. This gets into the
>> implementation details but at a high level, the dataset is:
>>>>> 
>>>>> 1) partitioned using key groups
>>>>> 2) data is run through a standard stream operator that takes a
>> snapshot of its state after processing all records and outputs metadata
>> handles for each subtask
>>>>> 3) those metadata handles are aggregated down to a single savepoint
>> handle
>>>>> 4) that handle is written out as a final metadata file
>>>>> 
>>>>> What’s important here is that the api actually depends on the data
>> flow collection and state is written out as a side effect of taking a
>> savepoint. The FLIP describes a lose coupling to the dataset api for
>> eventual migration to BoundedStream, that is true. However, the api does
>> require knowing what concrete data flow is being used to perform these
>> re-partitionings  and post aggregations.
>>>>> 
>>>>> I’m linking to my prototype implementation, particularly what actually
>> happens when you call write and run the transformations. Let me know if
>> that helps clarify.
>>>>> 
>>>>> Seth
>>>>> 
>>>>> 
>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
>>>>> 
>>>>> 
>>>>> 
>>>>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>> 
>>>>>> Hi Seth,
>>>>>> 
>>>>>> that sounds reasonable. What I was asking for was not to reverse
>> engineer binary format, but to make the savepoint write API a little more
>> reusable, so that it could be wrapped into some other technology. I don't
>> know the details enough to propose a solution, but it seems to me, that it
>> could be possible to use something like Writer instead of Transform. Or
>> maybe the Transform can use the Writer internally, the goal is just to
>> enable to create the savepoint from "'outside" of Flink (with some library,
>> of course).
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
>>>>>>> @Konstantin agreed, that was a large impotence for this feature.
>> Also I am happy to change the name to something that better  describes the
>> feature set.
>>>>>>> 
>>>>>>> @Lan
>>>>>>> 
>>>>>>> Savepoints depend heavily on a number of flink internal components
>> that may change between versions: state backends internals, type
>> serializers, the specific hash function used to turn a UID into an
>> OperatorID, etc. I consider it a feature of this proposal that the library
>> depends on those internal components instead of reverse engineering the
>> binary format. This way as those internals change, or new state features
>> are added (think the recent addition of TTL) we will get support
>> automatically. I do not believe anything else is maintainable.
>>>>>>> 
>>>>>>> Seth
>>>>>>> 
>>>>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> this is awesome, and really useful feature. If I might ask for one
>> thing to consider - would it be possible to make the Savepoint manipulation
>> API (at least writing the Savepoint) less dependent on other parts of Flink
>> internals (e.g. |KeyedStateBootstrapFunction|) and provide something more
>> general (e.g. some generic Writer)? Why I'm asking for that - I can totally
>> imagine situation, where users might want to create bootstrapped state by
>> some other runner (e.g. Apache Spark), and then run Apache Flink after the
>> state has been created. This makes even more sense in context of Apache
>> Beam, which provides all the necessary work to make this happen. The
>> question is - would it be possible to design this feature so that writing
>> the savepoint from different runner would be possible?
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> 
>>>>>>>> Jan
>>>>>>>> 
>>>>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
>>>>>>>>> Hey Everyone!
>>>>>>>>> ​
>>>>>>>>> Gordon and I have been discussing adding a savepoint connector to
>> flink for reading, writing and modifying savepoints.
>>>>>>>>> ​
>>>>>>>>> This is useful for:
>>>>>>>>> ​
>>>>>>>>>   Analyzing state for interesting patterns
>>>>>>>>>   Troubleshooting or auditing jobs by checking for discrepancies
>> in state
>>>>>>>>>   Bootstrapping state for new applications
>>>>>>>>>   Modifying savepoints such as:
>>>>>>>>>       Changing max parallelism
>>>>>>>>>       Making breaking schema changes
>>>>>>>>>       Correcting invalid state
>>>>>>>>> ​
>>>>>>>>> We are looking forward to your feedback!
>>>>>>>>> ​
>>>>>>>>> This is the FLIP:
>>>>>>>>> ​
>>>>>>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
>>>>>>>>> 
>>>>>>>>> Seth
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>> 


Re: [Discuss] FLIP-43: Savepoint Connector

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
+1 to renaming it as State Processing API and adding it under the
flink-libraries module.

I also think we can start with the development of the feature. From the
feedback so far, it seems like we're in a good spot to add in at least the
initial version of this API, hopefully making it ready for 1.9.0.

Cheers,
Gordon

On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <sj...@gmail.com> wrote:

> It seems like a recurring piece of feedback was a different name. I’d like
> to propose moving the functionality to the libraries module and naming this
> the State Processing API.
>
> Seth
>
> > On May 31, 2019, at 3:47 PM, Seth Wiesman <sj...@gmail.com> wrote:
> >
> > The SavepointOutputFormat only writes out the savepoint metadata file
> and should be mostly ignored.
> >
> > The actual state is written out by stream operators and tied into the
> flink runtime[1, 2, 3].
> >
> > This is the most important part and the piece that I don’t think can be
> reasonably extracted.
> >
> > Seth
> >
> > [1]
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84
> >
> > [2]
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java
> >
> > [3]
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java
> >
> >> On May 31, 2019, at 3:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
> >>
> >> Hi Seth,
> >>
> >> yes, that helped! :-)
> >>
> >> What I was looking for is essentially
> `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It
> would be great if this could be written in a way, that would enable reusing
> it in different engine (as I mentioned - Apache Spark). There seem to be
> some issues though. It uses interface Savepoint, which uses several other
> objects and interfaces from Flink's runtime. Maybe some convenience API
> might help - Apache Beam, handles operator naming, so that definitely
> should be transitionable between systems, but I'm not sure, how to
> construct OperatorID from this name. Would you think, that it is possible
> to come up with something that could be used in this portable way?
> >>
> >> I understand, there are some more conditions, that need to be satisfied
> (grouping, aggregating, ...), which would of course have to be handled by
> the target system. But Apache Beam can help leverage that. My idea would
> be, that there can be runner specified PTransform, that takes PCollection
> of some tuples of `(operator name, key, state name, value1), (operator
> name, key, state name, value2)`, and Runner's responsibility would be to
> group/aggregate this so that it can be written by runner's provided writer
> (output format).
> >>
> >> All of this would need a lot more design, these are just ideas of "what
> could be possible", I was just wondering if this FLIP can make some first
> steps towards this.
> >>
> >> Many thanks for comments,
> >>
> >> Jan
> >>
> >>> On 5/31/19 8:12 PM, Seth Wiesman wrote:
> >>> @Jan Gotcha,
> >>>
> >>> So in reusing components it explicitly is not a writer. This is not a
> savepoint output format in the way we have a parquet output format. The
> reason for the Transform api is to hide the underlying details, it does not
> simply append a output writer to the end of a dataset. This gets into the
> implementation details but at a high level, the dataset is:
> >>>
> >>> 1) partitioned using key groups
> >>> 2) data is run through a standard stream operator that takes a
> snapshot of its state after processing all records and outputs metadata
> handles for each subtask
> >>> 3) those metadata handles are aggregated down to a single savepoint
> handle
> >>> 4) that handle is written out as a final metadata file
> >>>
> >>> What’s important here is that the api actually depends on the data
> flow collection and state is written out as a side effect of taking a
> savepoint. The FLIP describes a lose coupling to the dataset api for
> eventual migration to BoundedStream, that is true. However, the api does
> require knowing what concrete data flow is being used to perform these
> re-partitionings  and post aggregations.
> >>>
> >>> I’m linking to my prototype implementation, particularly what actually
> happens when you call write and run the transformations. Let me know if
> that helps clarify.
> >>>
> >>> Seth
> >>>
> >>>
> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63
> >>>
> >>>
> >>>
> >>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <je...@seznam.cz> wrote:
> >>>>
> >>>> Hi Seth,
> >>>>
> >>>> that sounds reasonable. What I was asking for was not to reverse
> engineer binary format, but to make the savepoint write API a little more
> reusable, so that it could be wrapped into some other technology. I don't
> know the details enough to propose a solution, but it seems to me, that it
> could be possible to use something like Writer instead of Transform. Or
> maybe the Transform can use the Writer internally, the goal is just to
> enable to create the savepoint from "'outside" of Flink (with some library,
> of course).
> >>>>
> >>>> Jan
> >>>>
> >>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote:
> >>>>> @Konstantin agreed, that was a large impotence for this feature.
> Also I am happy to change the name to something that better  describes the
> feature set.
> >>>>>
> >>>>> @Lan
> >>>>>
> >>>>> Savepoints depend heavily on a number of flink internal components
> that may change between versions: state backends internals, type
> serializers, the specific hash function used to turn a UID into an
> OperatorID, etc. I consider it a feature of this proposal that the library
> depends on those internal components instead of reverse engineering the
> binary format. This way as those internals change, or new state features
> are added (think the recent addition of TTL) we will get support
> automatically. I do not believe anything else is maintainable.
> >>>>>
> >>>>> Seth
> >>>>>
> >>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <je...@seznam.cz> wrote:
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> this is awesome, and really useful feature. If I might ask for one
> thing to consider - would it be possible to make the Savepoint manipulation
> API (at least writing the Savepoint) less dependent on other parts of Flink
> internals (e.g. |KeyedStateBootstrapFunction|) and provide something more
> general (e.g. some generic Writer)? Why I'm asking for that - I can totally
> imagine situation, where users might want to create bootstrapped state by
> some other runner (e.g. Apache Spark), and then run Apache Flink after the
> state has been created. This makes even more sense in context of Apache
> Beam, which provides all the necessary work to make this happen. The
> question is - would it be possible to design this feature so that writing
> the savepoint from different runner would be possible?
> >>>>>>
> >>>>>> Cheers,
> >>>>>>
> >>>>>> Jan
> >>>>>>
> >>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote:
> >>>>>>> Hey Everyone!
> >>>>>>> ​
> >>>>>>> Gordon and I have been discussing adding a savepoint connector to
> flink for reading, writing and modifying savepoints.
> >>>>>>> ​
> >>>>>>> This is useful for:
> >>>>>>> ​
> >>>>>>>    Analyzing state for interesting patterns
> >>>>>>>    Troubleshooting or auditing jobs by checking for discrepancies
> in state
> >>>>>>>    Bootstrapping state for new applications
> >>>>>>>    Modifying savepoints such as:
> >>>>>>>        Changing max parallelism
> >>>>>>>        Making breaking schema changes
> >>>>>>>        Correcting invalid state
> >>>>>>> ​
> >>>>>>> We are looking forward to your feedback!
> >>>>>>> ​
> >>>>>>> This is the FLIP:
> >>>>>>> ​
> >>>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector
> >>>>>>>
> >>>>>>> Seth
> >>>>>>>
> >>>>>>>
> >>>>>>>
>