You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Filip Karnicki <fi...@gmail.com> on 2022/10/25 08:07:35 UTC

State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Hi, I'm trying to load a list state using the State Processor API (Flink
1.14.3)

Cluster settings:

state.backend: rocksdb
state.backend.incremental: true
(...)


Code:

val env = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(env, pathToSavepoint, new
EmbeddedRocksDBStateBackend(true))

val tpe = new
MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
null) // using Flink Stateful Functions
val envelopeSerializer: TypeSerializer[Message] =
tpe.createSerializer(env.getConfig)
val listDescriptor = new
ListStateDescriptor[Message]("delayed-message-buffer",
envelopeSerializer.duplicate)

(...)
override def open(parameters: Configuration): Unit = {
    getRuntimeContext.getListState(listDescriptor) // fails with error [1]
}


Error [1]:

Caused by: java.io.IOException: Failed to restore timer state

            at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)

            at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)

            at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)

            at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)

            at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)

            at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)

            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)

            at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.lang.RuntimeException: Error while getting state

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)

            at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)

            at
x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)

            at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)

            at
org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)

            at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)

            at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)

            ... 7 more

Caused by: org.apache.flink.util.StateMigrationException: The new namespace
serializer (org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da)
must be compatible with the old namespace serializer (
org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef).

            at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)

            at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)

            at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)

            at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)

            at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)

            at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)

            at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(
*DefaultKeyedStateStore*.java:71)

            ... 13 more




It seems that *DefaultKeyedStateStore *always wants to use
VoidNamespaceSerializer.INSTANCE despite my state being created with a
LongSerializer namespace serializer.


Is there anything anyone can immediately see me doing wrong?


Thank you

Fil

RE: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Filip, Hi Tsu-Li,

@Tsu-Li: long time not seen 😊 (it is time for an on-site FlinkForward in Berlin again next year 😊 )

Considering Tsu-Li’s proposal, there is a restriction, at the time being you can only create a HybridSource from other sources that have exactly the same type.
This is not always feasible given that Flip-27 sources have no means of projecting to a specific type within the source.
E.g. a flat-file HDFS source of String won’t match with a on-line Kafka source of some AVRO formatted type … to get this working is really tricky.

There is another idea that comes to mind (i.e. not verified / tested):

  *   Implement a MultipleInputOperator with InputSelectable (example see [1])
  *   Always select the bounded input until that one is finished,
  *   The other input(s) will backpressure until getting selected

A third idea, that worked before the existence of MultipleInputOperator was to

  *   Union the streams and buffer incoming records in keyed state (e.g. map state of timestamp -> List[event]), and
  *   By means of timers yield them only once the watermark passes by the stored timestamp
  *   State backend needs to be RocksDB for this because you can iterate MapState by Key-order (= timestamp), this does not work well for the other state backends
     *   @Tsu-Li: I remember well when you demonstrated this trick on the conference a couple of years ago
  *   The problem with this is, that you collect lots of state because of watermark-skew among the two input stream
  *   This can be remedied by restricting watermark-skew in job configuration [2]

@Filip: feel free to get back to us for help with getting this set-up …

Sincere grettings


Thias



[1] org.apache.flink.streaming.api.graph.StreamGraphGeneratorBatchExecutionTest.InputSelectableMultipleInputOperator
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_

From: Tzu-Li (Gordon) Tai <tz...@apache.org>
Sent: Wednesday, October 26, 2022 6:59 PM
To: Filip Karnicki <fi...@gmail.com>
Cc: Schwalbe Matthias <Ma...@viseca.ch>; user <us...@flink.apache.org>
Subject: Re: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Filip,

I think what you are seeing is expected. The State Processor API was intended to allow access only to commonly used user-facing state structures, while Stateful Functions uses quite a bit of Flink internal features, including for its state maintenance.
The list state in question in StateFun's FunctionGroupOperator is an internal kind of state normally used in the context of Flink window states that are namespaced. Normal user-facing list states are not namespaced.

Just curious, which specific state in FunctionGroupOperator are you trying to transform? I assume all other internal state in FunctionGroupOperator you want to remain untouched, and only wish to carry them over to be included in the transformed savepoint?

Thanks,
Gordon


On Wed, Oct 26, 2022 at 3:50 AM Filip Karnicki <fi...@gmail.com>> wrote:
Hi Thias

Thank you for your reply. I can re-create a simplified use case at home and stick it on github if you think it will help.

What I'm trying to access is pretty internal to Flink Stateful Functions. It seems that a custom operator (https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188) is accessing a KeyedStateBackend and creating an InternalListState, which I'm not sure I'll be able to get my hands on using the State Processor API.

The only reason why I need to get my hands on all the states from this Stateful Functions operator is because later I (think I) have to use .removeOperator(uid) on a savepoint and replace it .withOperator(uid, myTransformation) in order to transform my own, non-stateful-functions keyed state which also belongs to this operator.

Kind regards
Fil

On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias <Ma...@viseca.ch>> wrote:
Hi Filip,

It looks like, your state primitive is used in the context of Windows:
Keyed state works like this:
·         It uses a cascade of key types to store and retrieve values:
o    The key (set by .keyBy)
o    A namespace (usually a VoidNamespace), unless it is used in context of a specific window
o    An optional key of the state primitive (if it is a MapState)

In your case the state primitive is (probably) declared in the context of a window and hence when loading the state by means of StateProcessorAPI you also need to specify the correct Namespace TypeInformation.
If I am in doubt, how a state primitive is set up, I let the debugger stop in a process function and walk up the call stack to find the proper components implementing it.

If you share a little more of your code it is much easier to provide specific guidance 😊
(e.g. ‘savepoint’ is never used again in your code snippet …)

Sincere greeting

Thias



From: Filip Karnicki <fi...@gmail.com>>
Sent: Tuesday, October 25, 2022 10:08 AM
To: user <us...@flink.apache.org>>
Subject: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Hi, I'm trying to load a list state using the State Processor API (Flink 1.14.3)

Cluster settings:


state.backend: rocksdb

state.backend.incremental: true

(...)

Code:

val env = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(env, pathToSavepoint, new EmbeddedRocksDBStateBackend(true))

val tpe = new MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null) // using Flink Stateful Functions
val envelopeSerializer: TypeSerializer[Message] = tpe.createSerializer(env.getConfig)
val listDescriptor = new ListStateDescriptor[Message]("delayed-message-buffer", envelopeSerializer.duplicate)

(...)
override def open(parameters: Configuration): Unit = {
    getRuntimeContext.getListState(listDescriptor) // fails with error [1]
}


Error [1]:

Caused by: java.io.IOException: Failed to restore timer state
            at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
            at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
            at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
            at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error while getting state
            at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
            at org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
            at x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
            at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
            at org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
            at org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
            at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)
            ... 7 more
Caused by: org.apache.flink.util.StateMigrationException: The new namespace serializer (org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da<ma...@2806d6da>) must be compatible with the old namespace serializer (org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef<ma...@52b06bef>).
            at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)
            at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
            at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
            at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
            at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
            at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
            at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
            at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
            at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
            ... 13 more



It seems that DefaultKeyedStateStore always wants to use VoidNamespaceSerializer.INSTANCE despite my state being created with a LongSerializer namespace serializer.

Is there anything anyone can immediately see me doing wrong?

Thank you
Fil
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Posted by Filip Karnicki <fi...@gmail.com>.
Hi Gordon

I would like to carry over *all* of the internal states for the
FunctionGroupOperator, only changing my own state. I was under the
impression that the only way to do that is to call

mySvepoint.removeOperator("operator-uid").withOperator("operator-uid",
transformation)

(where transformation is):
val transformation =
OperatorTransformation.bootstrapWith(myKeyedState).keyBy((ks: KeyedState)
=> ks.key).transform(new KeyedStateBootstrapFunction[String,
MyKeyedStateClass] (... // here use the values of MyKeyedStateClass to
re-create the internal statefun state)

meaning that I'd lose all the internal state in "operator-uid" unless I
read it in first, and populated it back in the `transformation`. Am I
thinking about this all wrong?

Many thanks
Fil

(Thias, I'm assuming your answer around HybridSource is to another thread,
so I'm removing it from this reply)

On Wed, 26 Oct 2022 at 19:01, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Filip,
>
> I think what you are seeing is expected. The State Processor API was
> intended to allow access only to commonly used user-facing state
> structures, while Stateful Functions uses quite a bit of Flink internal
> features, including for its state maintenance.
> The list state in question in StateFun's FunctionGroupOperator is an
> internal kind of state normally used in the context of Flink window states
> that are namespaced. Normal user-facing list states are not namespaced.
>
> Just curious, which specific state in FunctionGroupOperator are you trying
> to transform? I assume all other internal state in FunctionGroupOperator
> you want to remain untouched, and only wish to carry them over to be
> included in the transformed savepoint?
>
> Thanks,
> Gordon
>
>
> On Wed, Oct 26, 2022 at 3:50 AM Filip Karnicki <fi...@gmail.com>
> wrote:
>
>> Hi Thias
>>
>> Thank you for your reply. I can re-create a simplified use case at home
>> and stick it on github if you think it will help.
>>
>> What I'm trying to access is pretty internal to Flink Stateful Functions.
>> It seems that a custom operator (
>> https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188)
>> is accessing a KeyedStateBackend and creating an InternalListState, which
>> I'm not sure I'll be able to get my hands on using the State Processor API.
>>
>> The only reason why I need to get my hands on all the states from this
>> Stateful Functions operator is because later I (think I) have to use
>> .removeOperator(uid) on a savepoint and replace it .withOperator(uid,
>> myTransformation) in order to transform my own, non-stateful-functions
>> keyed state which also belongs to this operator.
>>
>> Kind regards
>> Fil
>>
>> On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias <
>> Matthias.Schwalbe@viseca.ch> wrote:
>>
>>> Hi Filip,
>>>
>>>
>>>
>>> It looks like, your state primitive is used in the context of Windows:
>>>
>>> Keyed state works like this:
>>>
>>>    - It uses a cascade of key types to store and retrieve values:
>>>       - The key (set by .keyBy)
>>>       - A namespace (usually a VoidNamespace), unless it is used in
>>>       context of a specific window
>>>       - An optional key of the state primitive (if it is a MapState)
>>>
>>>
>>>
>>> In your case the state primitive is (probably) declared in the context
>>> of a window and hence when loading the state by means of StateProcessorAPI
>>> you also need to specify the correct Namespace TypeInformation.
>>>
>>> If I am in doubt, how a state primitive is set up, I let the debugger
>>> stop in a process function and walk up the call stack to find the proper
>>> components implementing it.
>>>
>>>
>>>
>>> If you share a little more of your code it is much easier to provide
>>> specific guidance 😊
>>>
>>> (e.g. ‘savepoint’ is never used again in your code snippet …)
>>>
>>>
>>>
>>> Sincere greeting
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Filip Karnicki <fi...@gmail.com>
>>> *Sent:* Tuesday, October 25, 2022 10:08 AM
>>> *To:* user <us...@flink.apache.org>
>>> *Subject:* State Processor API - VoidNamespaceSerializer must be
>>> compatible with the old namespace serializer LongSerializer
>>>
>>>
>>>
>>> Hi, I'm trying to load a list state using the State Processor API (Flink
>>> 1.14.3)
>>>
>>>
>>>
>>> Cluster settings:
>>>
>>>
>>>
>>> state.backend: rocksdb
>>>
>>> state.backend.incremental: true
>>>
>>> (...)
>>>
>>>
>>>
>>> Code:
>>>
>>>
>>>
>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>
>>> val savepoint = Savepoint.load(env, pathToSavepoint, new
>>> EmbeddedRocksDBStateBackend(true))
>>>
>>>
>>> val tpe = new
>>> MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
>>> null) // using Flink Stateful Functions
>>> val envelopeSerializer: TypeSerializer[Message] =
>>> tpe.createSerializer(env.getConfig)
>>>
>>> val listDescriptor = new
>>> ListStateDescriptor[Message]("delayed-message-buffer",
>>> envelopeSerializer.duplicate)
>>>
>>> (...)
>>> override def open(parameters: Configuration): Unit = {
>>>
>>>     getRuntimeContext.getListState(listDescriptor) // fails with error
>>> [1]
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>> Error [1]:
>>>
>>>
>>>
>>> Caused by: java.io.IOException: Failed to restore timer state
>>>
>>>             at
>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>>>
>>>             at
>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
>>>
>>>             at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>>>
>>>             at
>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>>
>>>             at
>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>>
>>>             at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>>
>>>             at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>
>>>             at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>
>>>             at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>>>
>>>             at
>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
>>>
>>>             at
>>> x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
>>>
>>>             at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>
>>>             at
>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>
>>>             at
>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>
>>>             at
>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)
>>>
>>>             ... 7 more
>>>
>>> Caused by: org.apache.flink.util.StateMigrationException: The new
>>> namespace serializer (
>>> org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da) must
>>> be compatible with the old namespace serializer (
>>> org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef).
>>>
>>>             at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)
>>>
>>>             at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>>
>>>             at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>>
>>>             at
>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>
>>>             at
>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
>>>
>>>             at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>>>
>>>             at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>>>
>>>             at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>>
>>>             at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(
>>> *DefaultKeyedStateStore*.java:71)
>>>
>>>             ... 13 more
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> It seems that *DefaultKeyedStateStore *always wants to use
>>> VoidNamespaceSerializer.INSTANCE despite my state being created with a
>>> LongSerializer namespace serializer.
>>>
>>>
>>>
>>> Is there anything anyone can immediately see me doing wrong?
>>>
>>>
>>>
>>> Thank you
>>>
>>> Fil
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>

Re: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Filip,

I think what you are seeing is expected. The State Processor API was
intended to allow access only to commonly used user-facing state
structures, while Stateful Functions uses quite a bit of Flink internal
features, including for its state maintenance.
The list state in question in StateFun's FunctionGroupOperator is an
internal kind of state normally used in the context of Flink window states
that are namespaced. Normal user-facing list states are not namespaced.

Just curious, which specific state in FunctionGroupOperator are you trying
to transform? I assume all other internal state in FunctionGroupOperator
you want to remain untouched, and only wish to carry them over to be
included in the transformed savepoint?

Thanks,
Gordon


On Wed, Oct 26, 2022 at 3:50 AM Filip Karnicki <fi...@gmail.com>
wrote:

> Hi Thias
>
> Thank you for your reply. I can re-create a simplified use case at home
> and stick it on github if you think it will help.
>
> What I'm trying to access is pretty internal to Flink Stateful Functions.
> It seems that a custom operator (
> https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188)
> is accessing a KeyedStateBackend and creating an InternalListState, which
> I'm not sure I'll be able to get my hands on using the State Processor API.
>
> The only reason why I need to get my hands on all the states from this
> Stateful Functions operator is because later I (think I) have to use
> .removeOperator(uid) on a savepoint and replace it .withOperator(uid,
> myTransformation) in order to transform my own, non-stateful-functions
> keyed state which also belongs to this operator.
>
> Kind regards
> Fil
>
> On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias <
> Matthias.Schwalbe@viseca.ch> wrote:
>
>> Hi Filip,
>>
>>
>>
>> It looks like, your state primitive is used in the context of Windows:
>>
>> Keyed state works like this:
>>
>>    - It uses a cascade of key types to store and retrieve values:
>>       - The key (set by .keyBy)
>>       - A namespace (usually a VoidNamespace), unless it is used in
>>       context of a specific window
>>       - An optional key of the state primitive (if it is a MapState)
>>
>>
>>
>> In your case the state primitive is (probably) declared in the context of
>> a window and hence when loading the state by means of StateProcessorAPI you
>> also need to specify the correct Namespace TypeInformation.
>>
>> If I am in doubt, how a state primitive is set up, I let the debugger
>> stop in a process function and walk up the call stack to find the proper
>> components implementing it.
>>
>>
>>
>> If you share a little more of your code it is much easier to provide
>> specific guidance 😊
>>
>> (e.g. ‘savepoint’ is never used again in your code snippet …)
>>
>>
>>
>> Sincere greeting
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* Filip Karnicki <fi...@gmail.com>
>> *Sent:* Tuesday, October 25, 2022 10:08 AM
>> *To:* user <us...@flink.apache.org>
>> *Subject:* State Processor API - VoidNamespaceSerializer must be
>> compatible with the old namespace serializer LongSerializer
>>
>>
>>
>> Hi, I'm trying to load a list state using the State Processor API (Flink
>> 1.14.3)
>>
>>
>>
>> Cluster settings:
>>
>>
>>
>> state.backend: rocksdb
>>
>> state.backend.incremental: true
>>
>> (...)
>>
>>
>>
>> Code:
>>
>>
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>>
>> val savepoint = Savepoint.load(env, pathToSavepoint, new
>> EmbeddedRocksDBStateBackend(true))
>>
>>
>> val tpe = new
>> MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
>> null) // using Flink Stateful Functions
>> val envelopeSerializer: TypeSerializer[Message] =
>> tpe.createSerializer(env.getConfig)
>>
>> val listDescriptor = new
>> ListStateDescriptor[Message]("delayed-message-buffer",
>> envelopeSerializer.duplicate)
>>
>> (...)
>> override def open(parameters: Configuration): Unit = {
>>
>>     getRuntimeContext.getListState(listDescriptor) // fails with error [1]
>>
>> }
>>
>>
>>
>>
>>
>> Error [1]:
>>
>>
>>
>> Caused by: java.io.IOException: Failed to restore timer state
>>
>>             at
>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>>
>>             at
>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
>>
>>             at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>>
>>             at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>
>>             at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>
>>             at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>
>>             at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>
>>             at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> Caused by: java.lang.RuntimeException: Error while getting state
>>
>>             at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>>
>>             at
>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
>>
>>             at
>> x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
>>
>>             at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>
>>             at
>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>
>>             at
>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>
>>             at
>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)
>>
>>             ... 7 more
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new
>> namespace serializer (
>> org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da) must be
>> compatible with the old namespace serializer (
>> org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef).
>>
>>             at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)
>>
>>             at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>
>>             at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>
>>             at
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>
>>             at
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
>>
>>             at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>>
>>             at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>>
>>             at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>
>>             at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(
>> *DefaultKeyedStateStore*.java:71)
>>
>>             ... 13 more
>>
>>
>>
>>
>>
>>
>>
>> It seems that *DefaultKeyedStateStore *always wants to use
>> VoidNamespaceSerializer.INSTANCE despite my state being created with a
>> LongSerializer namespace serializer.
>>
>>
>>
>> Is there anything anyone can immediately see me doing wrong?
>>
>>
>>
>> Thank you
>>
>> Fil
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>

Re: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Posted by Filip Karnicki <fi...@gmail.com>.
Hi Thias

Thank you for your reply. I can re-create a simplified use case at home and
stick it on github if you think it will help.

What I'm trying to access is pretty internal to Flink Stateful Functions.
It seems that a custom operator (
https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188)
is accessing a KeyedStateBackend and creating an InternalListState, which
I'm not sure I'll be able to get my hands on using the State Processor API.

The only reason why I need to get my hands on all the states from this
Stateful Functions operator is because later I (think I) have to use
.removeOperator(uid) on a savepoint and replace it .withOperator(uid,
myTransformation) in order to transform my own, non-stateful-functions
keyed state which also belongs to this operator.

Kind regards
Fil

On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias <Ma...@viseca.ch>
wrote:

> Hi Filip,
>
>
>
> It looks like, your state primitive is used in the context of Windows:
>
> Keyed state works like this:
>
>    - It uses a cascade of key types to store and retrieve values:
>       - The key (set by .keyBy)
>       - A namespace (usually a VoidNamespace), unless it is used in
>       context of a specific window
>       - An optional key of the state primitive (if it is a MapState)
>
>
>
> In your case the state primitive is (probably) declared in the context of
> a window and hence when loading the state by means of StateProcessorAPI you
> also need to specify the correct Namespace TypeInformation.
>
> If I am in doubt, how a state primitive is set up, I let the debugger stop
> in a process function and walk up the call stack to find the proper
> components implementing it.
>
>
>
> If you share a little more of your code it is much easier to provide
> specific guidance 😊
>
> (e.g. ‘savepoint’ is never used again in your code snippet …)
>
>
>
> Sincere greeting
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Filip Karnicki <fi...@gmail.com>
> *Sent:* Tuesday, October 25, 2022 10:08 AM
> *To:* user <us...@flink.apache.org>
> *Subject:* State Processor API - VoidNamespaceSerializer must be
> compatible with the old namespace serializer LongSerializer
>
>
>
> Hi, I'm trying to load a list state using the State Processor API (Flink
> 1.14.3)
>
>
>
> Cluster settings:
>
>
>
> state.backend: rocksdb
>
> state.backend.incremental: true
>
> (...)
>
>
>
> Code:
>
>
>
> val env = ExecutionEnvironment.getExecutionEnvironment
>
> val savepoint = Savepoint.load(env, pathToSavepoint, new
> EmbeddedRocksDBStateBackend(true))
>
>
> val tpe = new
> MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
> null) // using Flink Stateful Functions
> val envelopeSerializer: TypeSerializer[Message] =
> tpe.createSerializer(env.getConfig)
>
> val listDescriptor = new
> ListStateDescriptor[Message]("delayed-message-buffer",
> envelopeSerializer.duplicate)
>
> (...)
> override def open(parameters: Configuration): Unit = {
>
>     getRuntimeContext.getListState(listDescriptor) // fails with error [1]
>
> }
>
>
>
>
>
> Error [1]:
>
>
>
> Caused by: java.io.IOException: Failed to restore timer state
>
>             at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>
>             at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
>
>             at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>
>             at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>
>             at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>
>             at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>
>             at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>
>             at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: java.lang.RuntimeException: Error while getting state
>
>             at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>
>             at
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
>
>             at
> x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
>
>             at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>
>             at
> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>
>             at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>
>             at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)
>
>             ... 7 more
>
> Caused by: org.apache.flink.util.StateMigrationException: The new
> namespace serializer (
> org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da) must be
> compatible with the old namespace serializer (
> org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef).
>
>             at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)
>
>             at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>
>             at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>
>             at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>
>             at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
>
>             at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>
>             at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>
>             at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>
>             at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(
> *DefaultKeyedStateStore*.java:71)
>
>             ... 13 more
>
>
>
>
>
>
>
> It seems that *DefaultKeyedStateStore *always wants to use
> VoidNamespaceSerializer.INSTANCE despite my state being created with a
> LongSerializer namespace serializer.
>
>
>
> Is there anything anyone can immediately see me doing wrong?
>
>
>
> Thank you
>
> Fil
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Filip,

It looks like, your state primitive is used in the context of Windows:
Keyed state works like this:

  *   It uses a cascade of key types to store and retrieve values:
     *   The key (set by .keyBy)
     *   A namespace (usually a VoidNamespace), unless it is used in context of a specific window
     *   An optional key of the state primitive (if it is a MapState)

In your case the state primitive is (probably) declared in the context of a window and hence when loading the state by means of StateProcessorAPI you also need to specify the correct Namespace TypeInformation.
If I am in doubt, how a state primitive is set up, I let the debugger stop in a process function and walk up the call stack to find the proper components implementing it.

If you share a little more of your code it is much easier to provide specific guidance 😊
(e.g. ‘savepoint’ is never used again in your code snippet …)

Sincere greeting

Thias



From: Filip Karnicki <fi...@gmail.com>
Sent: Tuesday, October 25, 2022 10:08 AM
To: user <us...@flink.apache.org>
Subject: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

Hi, I'm trying to load a list state using the State Processor API (Flink 1.14.3)

Cluster settings:


state.backend: rocksdb

state.backend.incremental: true

(...)

Code:

val env = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(env, pathToSavepoint, new EmbeddedRocksDBStateBackend(true))

val tpe = new MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null) // using Flink Stateful Functions
val envelopeSerializer: TypeSerializer[Message] = tpe.createSerializer(env.getConfig)
val listDescriptor = new ListStateDescriptor[Message]("delayed-message-buffer", envelopeSerializer.duplicate)

(...)
override def open(parameters: Configuration): Unit = {
    getRuntimeContext.getListState(listDescriptor) // fails with error [1]
}


Error [1]:

Caused by: java.io.IOException: Failed to restore timer state
            at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
            at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
            at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
            at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error while getting state
            at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
            at org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
            at x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
            at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
            at org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
            at org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
            at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)
            ... 7 more
Caused by: org.apache.flink.util.StateMigrationException: The new namespace serializer (org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da<ma...@2806d6da>) must be compatible with the old namespace serializer (org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef<ma...@52b06bef>).
            at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)
            at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
            at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
            at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
            at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
            at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
            at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
            at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
            at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
            ... 13 more



It seems that DefaultKeyedStateStore always wants to use VoidNamespaceSerializer.INSTANCE despite my state being created with a LongSerializer namespace serializer.

Is there anything anyone can immediately see me doing wrong?

Thank you
Fil
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.