You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Avi Levi <av...@bluevoyant.com> on 2019/07/14 08:50:19 UTC

State incompatible

Hi,
I added a ttl to my state
*old version :*
 private lazy val stateDescriptor = new ValueStateDescriptor("foo",
Types.CASE_CLASS[DomainState])

*vs the new version *

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
    val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
    des.enableTimeToLive(storeTtl)
    des
  }

*BUT when trying to restore from savepoint I am getting this error:*

java.lang.RuntimeException: Error while getting state
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
	...


Caused by: org.apache.flink.util.StateMigrationException: The new
state serializer cannot be incompatible.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
	... 11 more


Do you have any idea how can I resolve it ?


Best wishes

Re: State incompatible

Posted by Avi Levi <av...@bluevoyant.com>.
Thanks Haibo,
bummer ;)

On Mon, Jul 15, 2019 at 12:27 PM Haibo Sun <su...@163.com> wrote:

> *This Message originated outside your organization.*
> ------------------------------
> Hi,  Avi Levi
>
> I don't think there's any way to solve this problem right now, and Flink
> documentation clearly shows that this is not supported.
>
> “Trying to restore state, which was previously configured without TTL,
> using TTL enabled descriptor or vice versa will lead to compatibility
> failure and StateMigrationException."
>
> Flink Document:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl>
>
> Best,
> Haibo
>
> At 2019-07-14 16:50:19, "Avi Levi" <av...@bluevoyant.com> wrote:
>
> Hi,
> I added a ttl to my state
> *old version :*
>  private lazy val stateDescriptor = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
>
> *vs the new version *
>
> @transient
> private lazy val storeTtl = StateTtlConfig.newBuilder(90)
>   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>   .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>   .cleanupInRocksdbCompactFilter()
>   .build()
>
>   private lazy val stateDescriptor = {
>     val des = new ValueStateDescriptor("foo",
> Types.CASE_CLASS[DomainState])
>     des.enableTimeToLive(storeTtl)
>     des
>   }
>
> *BUT when trying to restore from savepoint I am getting this error:*
>
> java.lang.RuntimeException: Error while getting state
> 	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> 	...
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
> 	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> 	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
> 	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> 	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> 	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
> 	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
> 	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> 	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> 	... 11 more
>
>
> Do you have any idea how can I resolve it ?
>
>
> Best wishes
>
>

Re:State incompatible

Posted by Haibo Sun <su...@163.com>.
Hi,  Avi Levi


I don't think there's any way to solve this problem right now, and Flink documentation clearly shows that this is not supported. 


“Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa will lead to compatibility failure and StateMigrationException."


Flink Document: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl


Best,
Haibo

At 2019-07-14 16:50:19, "Avi Levi" <av...@bluevoyant.com> wrote:

Hi,

I added a ttl to my state 
old version :
 private lazy val stateDescriptor = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])


vs the new version 

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
    val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
    des.enableTimeToLive(storeTtl)
    des
  }


BUT when trying to restore from savepoint I am getting this error:


java.lang.RuntimeException: Error while getting state
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
	...

Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
	... 11 more


Do you have any idea how can I resolve it ? 


Best wishes