You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mark Niehe <ma...@segment.com> on 2020/02/18 23:02:46 UTC

State Processor API Keyed State

Hey all,

I've run into an issue with the State Processor API. To highlight the
issues I've been having, I've created a reference repository that will
demonstrate the issue (repository:
https://github.com/segmentio/flink-state-management).

The current implementation of the pipeline has left us with keyed state
that we no longer need, and we don't have references some of the old keys.
My plan was to:
1. create a savepoint
2. read the keys from each operator (using State Processor API)
3. filter out all the keys that are longer used
4. bootstrap a new savepoint that contains the filtered state

I managed to get this working using a sample pipeline and a very basic key
(a string), but when I switched the key to be something more complex (a
case class of two strings), I started seeing this exception:
Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.
at
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 13 more

Has anyone come across this before and figured out a fix? Any help you can
give would be greatly appreciated!

Thanks,
-- 
<http://segment.com/>
Mark Niehe ·  Software Engineer
Integrations
<https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·  Blog
<https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  We're
Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>

Re: State Processor API Keyed State

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
There might be a possible workaround for this, for now:

Basically, the trick is to explicitly tell the State Processor API to use a
specified type information to access the keyed state.
You can do that with the `ExistingSavepoint#readKeyedState(String uid,
KeyedStateReaderFunction function, TypeInformation<K> keyTypeInfo,
TypeInformation<OUT> outTypeInfo)`.
This would allow the State Processor API to bypass the Java type
information extraction process (which is not compatible with how it is done
in Scala DataStream right now, hence the StateMigrationException you are
getting).

What you'd have to do, is in your pipeline job, explicitly generate the
serializer / type information using either the Scala DataStream macro
`createTypeInformation` or just use a custom serializer.
Then, specify to use that serializer / type info when reading keyed state
with the State Processor API.
Simply put: you'll be specifying explicitly what serializer to use for the
keys, and tell the State Processor API to also use that serializer to
access state.

This is not nice, but should work for now. Would be interesting to hear how
that works out for you.
As mentioned above, eventually a possible ideal solution is that type
information extraction should be converged for the Java / Scala DataStream
APIs.

Cheers,
Gordon

On Wed, Feb 19, 2020 at 10:20 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> Just to clarify -
> I quickly went through the README of the project, and saw this:
> "This error is seen after trying to read from a savepoint that was created
> using the same case class as a key."
>
> So, if I understood correctly, you were attempting to use the State
> Processor API to access a savepoint that was written with a Scala
> DataStream job, correct?
>
> If that's the case, I'm afraid this would not work as of now. See [1] for
> a similar scenario that others had also bumped into.
> TL;DR is - the State Processor API currently is not guaranteed to work for
> snapshots that are written with Scala DataStream jobs.
>
> For now, I'll add a big warning about this to the docs.
> But in general, it seems like we might want to consider bumping up the
> priority for enabling this, as quite a few users are using the Scala
> DataStream API for their jobs.
>
> Just as a side comment: this repo looks like a very interesting project!
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-15719
>
> On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe <ma...@segment.com> wrote:
>
>> Hey all,
>>
>> I've run into an issue with the State Processor API. To highlight the
>> issues I've been having, I've created a reference repository that will
>> demonstrate the issue (repository:
>> https://github.com/segmentio/flink-state-management).
>>
>> The current implementation of the pipeline has left us with keyed state
>> that we no longer need, and we don't have references some of the old keys.
>> My plan was to:
>> 1. create a savepoint
>> 2. read the keys from each operator (using State Processor API)
>> 3. filter out all the keys that are longer used
>> 4. bootstrap a new savepoint that contains the filtered state
>>
>> I managed to get this working using a sample pipeline and a very basic
>> key (a string), but when I switched the key to be something more complex (a
>> case class of two strings), I started seeing this exception:
>> Caused by: org.apache.flink.util.StateMigrationException: The new key
>> serializer must be compatible.
>> at
>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
>> ... 13 more
>>
>> Has anyone come across this before and figured out a fix? Any help you
>> can give would be greatly appreciated!
>>
>> Thanks,
>> --
>> <http://segment.com/>
>> Mark Niehe ·  Software Engineer
>> Integrations
>> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
>> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>
>>   ·  We're Hiring!
>> <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>>
>

Re: State Processor API Keyed State

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Just to clarify -
I quickly went through the README of the project, and saw this:
"This error is seen after trying to read from a savepoint that was created
using the same case class as a key."

So, if I understood correctly, you were attempting to use the State
Processor API to access a savepoint that was written with a Scala
DataStream job, correct?

If that's the case, I'm afraid this would not work as of now. See [1] for a
similar scenario that others had also bumped into.
TL;DR is - the State Processor API currently is not guaranteed to work for
snapshots that are written with Scala DataStream jobs.

For now, I'll add a big warning about this to the docs.
But in general, it seems like we might want to consider bumping up the
priority for enabling this, as quite a few users are using the Scala
DataStream API for their jobs.

Just as a side comment: this repo looks like a very interesting project!

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-15719

On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe <ma...@segment.com> wrote:

> Hey all,
>
> I've run into an issue with the State Processor API. To highlight the
> issues I've been having, I've created a reference repository that will
> demonstrate the issue (repository:
> https://github.com/segmentio/flink-state-management).
>
> The current implementation of the pipeline has left us with keyed state
> that we no longer need, and we don't have references some of the old keys.
> My plan was to:
> 1. create a savepoint
> 2. read the keys from each operator (using State Processor API)
> 3. filter out all the keys that are longer used
> 4. bootstrap a new savepoint that contains the filtered state
>
> I managed to get this working using a sample pipeline and a very basic key
> (a string), but when I switched the key to be something more complex (a
> case class of two strings), I started seeing this exception:
> Caused by: org.apache.flink.util.StateMigrationException: The new key
> serializer must be compatible.
> at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
> ... 13 more
>
> Has anyone come across this before and figured out a fix? Any help you can
> give would be greatly appreciated!
>
> Thanks,
> --
> <http://segment.com/>
> Mark Niehe ·  Software Engineer
> Integrations
> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  We're
> Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>