You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shi Quan <qu...@outlook.com> on 2019/05/06 06:44:06 UTC

IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer

Hi,
Recently we encounter with IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer. Test code description:

  1.  Use heap state backend
  2.  Create MapSateDescription by Class(String.class and HashMap.class)
  3.  Enable state TTL
  4.  Flink version: 1.6.1

Core test code:

MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", String.class, HashMap.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);

IllegalArgumentException was throwed here cause the value of originalSerializers.length is 1:

protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
   PrecomputedParameters precomputed,
   TypeSerializer<?> ... originalSerializers) {
   Preconditions.checkNotNull(originalSerializers);
   Preconditions.checkArgument(originalSerializers.length == 2);
   return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
}


We found some clues in flink source code, here what happed step by step:

Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when open operator , meanwhile, an CompositeSerializer was created.  Cause KryoSerialzer.duplicate create another KryoSerialzer  instance(see source code), CompositeSerializer.precomputed.stateful was set to “true”. Could someone tell me what stateful mean in precomputed?

Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. CompositeSerializer was duplicate in this step. Source code:

@Override
public CompositeSerializer<T> duplicate() {
   return precomputed.stateful ?
      createSerializerInstance(precomputed, duplicateFieldSerializers(fieldSerializers)) : this;
}



        protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
   PrecomputedParameters precomputed,
   TypeSerializer<?> ... originalSerializers) {
   Preconditions.checkNotNull(originalSerializers);
   Preconditions.checkArgument(originalSerializers.length == 2);
   return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
}
We notice that the new TtlSerializer only contains fieldSerializer(originalSerializer[1]),  LongSerializer for timestamp missed.

Step3. Duplicate Serializer again whe snapshot, Source Code:

CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {

   super(owningStateTable);
   this.snapshotData = owningStateTable.snapshotTableArrays();
   this.snapshotVersion = owningStateTable.getStateTableVersion();
   this.numberOfEntriesInSnapshotData = owningStateTable.size();

   // We create duplicates of the serializers for the async snapshot, because TypeSerializer
   // might be stateful and shared with the event processing thread.
   this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate();
   this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
   this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate();

   this.partitionedStateTableSnapshot = null;
}
Stack:

  *   new CopyOnWriteStateTableSnapshot
  *   CopyOnWriteStateTable.snapshot()
  *   HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
  *   ……
  *   HeapKeyedStateBackend.snapshot()


IllegalArgumentException happens when duplicate the compositeSerializer created in step2, cause only one field serializer inside it.

Did I clarify the problem? Is this a bug?
We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just return a new serializer instance, but return original instance in most case.  Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance?

Best,
Quan Shi


Re: IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reporting this issue Quan. I've pulled in Andrey who developed
this feature and might shed some light on the problem.

Cheers,
Till

On Mon, May 6, 2019 at 11:04 AM Congxian Qiu <qc...@gmail.com> wrote:

> Hi Quan
> Is the problem still there when running on 1.8?  If there is still a
> problem when using 1.8, could you please share a minimal reproduce demo.
> Thanks
>
> Best, Congxian
> On May 6, 2019, 14:44 +0800, Shi Quan <qu...@outlook.com>, wrote:
> > Hi,
> > Recently we encounter with IllegalArgumentException when using state
> with TTL enable and kryoSerializer as field Serializer. Test code
> description:
> >
> > 1. Use heap state backend
> > 2. Create MapSateDescription by Class(String.class and HashMap.class)
> > 3. Enable state TTL
> > 4. Flink version: 1.6.1
> >
> > Core test code:
> >
> > MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test",
> String.class, HashMap.class);
> > mapStateDescriptor.enableTimeToLive(ttlConfig);
> >
> > IllegalArgumentException was throwed here cause the value of
> originalSerializers.length is 1:
> >
> > protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> > PrecomputedParameters precomputed,
> > TypeSerializer<?> ... originalSerializers) {
> > Preconditions.checkNotNull(originalSerializers);
> > Preconditions.checkArgument(originalSerializers.length == 2);
> > return new TtlSerializer<>(precomputed, (TypeSerializer<T>)
> originalSerializers[1]);
> > }
> >
> >
> > We found some clues in flink source code, here what happed step by step:
> >
> > Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState
> when open operator , meanwhile, an CompositeSerializer was created. Cause
> KryoSerialzer.duplicate create another KryoSerialzer instance(see source
> code), CompositeSerializer.precomputed.stateful was set to “true”. Could
> someone tell me what stateful mean in precomputed?
> >
> > Step 2. Create newMetaInfo by ttlDescriptor when register SateTable.
> CompositeSerializer was duplicate in this step. Source code:
> >
> > @Override
> > public CompositeSerializer<T> duplicate() {
> > return precomputed.stateful ?
> > createSerializerInstance(precomputed,
> duplicateFieldSerializers(fieldSerializers)) : this;
> > }
> >
> >
> >
> > protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> > PrecomputedParameters precomputed,
> > TypeSerializer<?> ... originalSerializers) {
> > Preconditions.checkNotNull(originalSerializers);
> > Preconditions.checkArgument(originalSerializers.length == 2);
> > return new TtlSerializer<>(precomputed, (TypeSerializer<T>)
> originalSerializers[1]);
> > }
> > We notice that the new TtlSerializer only contains
> fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed.
> >
> > Step3. Duplicate Serializer again whe snapshot, Source Code:
> >
> > CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S>
> owningStateTable) {
> >
> > super(owningStateTable);
> > this.snapshotData = owningStateTable.snapshotTableArrays();
> > this.snapshotVersion = owningStateTable.getStateTableVersion();
> > this.numberOfEntriesInSnapshotData = owningStateTable.size();
> >
> > // We create duplicates of the serializers for the async snapshot,
> because TypeSerializer
> > // might be stateful and shared with the event processing thread.
> > this.localKeySerializer =
> owningStateTable.keyContext.getKeySerializer().duplicate();
> > this.localNamespaceSerializer =
> owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
> > this.localStateSerializer =
> owningStateTable.metaInfo.getStateSerializer().duplicate();
> >
> > this.partitionedStateTableSnapshot = null;
> > }
> > Stack:
> >
> > * new CopyOnWriteStateTableSnapshot
> > * CopyOnWriteStateTable.snapshot()
> > *
> HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
> > * ……
> > * HeapKeyedStateBackend.snapshot()
> >
> >
> > IllegalArgumentException happens when duplicate the compositeSerializer
> created in step2, cause only one field serializer inside it.
> >
> > Did I clarify the problem? Is this a bug?
> > We compare KryoSerializer with PojoSerializer,PojoSerilizer does not
> just return a new serializer instance, but return original instance in most
> case. Which one has bug, KryoSerilizer or
> TtlStateFactory::createSerializerInstance?
> >
> > Best,
> > Quan Shi
> >
>

Re: IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Quan
Is the problem still there when running on 1.8?  If there is still a problem when using 1.8, could you please share a minimal reproduce demo. Thanks

Best, Congxian
On May 6, 2019, 14:44 +0800, Shi Quan <qu...@outlook.com>, wrote:
> Hi,
> Recently we encounter with IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer. Test code description:
>
> 1. Use heap state backend
> 2. Create MapSateDescription by Class(String.class and HashMap.class)
> 3. Enable state TTL
> 4. Flink version: 1.6.1
>
> Core test code:
>
> MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", String.class, HashMap.class);
> mapStateDescriptor.enableTimeToLive(ttlConfig);
>
> IllegalArgumentException was throwed here cause the value of originalSerializers.length is 1:
>
> protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> PrecomputedParameters precomputed,
> TypeSerializer<?> ... originalSerializers) {
> Preconditions.checkNotNull(originalSerializers);
> Preconditions.checkArgument(originalSerializers.length == 2);
> return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
> }
>
>
> We found some clues in flink source code, here what happed step by step:
>
> Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when open operator , meanwhile, an CompositeSerializer was created. Cause KryoSerialzer.duplicate create another KryoSerialzer instance(see source code), CompositeSerializer.precomputed.stateful was set to “true”. Could someone tell me what stateful mean in precomputed?
>
> Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. CompositeSerializer was duplicate in this step. Source code:
>
> @Override
> public CompositeSerializer<T> duplicate() {
> return precomputed.stateful ?
> createSerializerInstance(precomputed, duplicateFieldSerializers(fieldSerializers)) : this;
> }
>
>
>
> protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> PrecomputedParameters precomputed,
> TypeSerializer<?> ... originalSerializers) {
> Preconditions.checkNotNull(originalSerializers);
> Preconditions.checkArgument(originalSerializers.length == 2);
> return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
> }
> We notice that the new TtlSerializer only contains fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed.
>
> Step3. Duplicate Serializer again whe snapshot, Source Code:
>
> CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {
>
> super(owningStateTable);
> this.snapshotData = owningStateTable.snapshotTableArrays();
> this.snapshotVersion = owningStateTable.getStateTableVersion();
> this.numberOfEntriesInSnapshotData = owningStateTable.size();
>
> // We create duplicates of the serializers for the async snapshot, because TypeSerializer
> // might be stateful and shared with the event processing thread.
> this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate();
> this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
> this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate();
>
> this.partitionedStateTableSnapshot = null;
> }
> Stack:
>
> * new CopyOnWriteStateTableSnapshot
> * CopyOnWriteStateTable.snapshot()
> * HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
> * ……
> * HeapKeyedStateBackend.snapshot()
>
>
> IllegalArgumentException happens when duplicate the compositeSerializer created in step2, cause only one field serializer inside it.
>
> Did I clarify the problem? Is this a bug?
> We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just return a new serializer instance, but return original instance in most case. Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance?
>
> Best,
> Quan Shi
>