You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "binguo (Jira)" <ji...@apache.org> on 2021/01/25 09:27:00 UTC

[jira] [Updated] (FLINK-21121) TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api

     [ https://issues.apache.org/jira/browse/FLINK-21121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

binguo updated FLINK-21121:
---------------------------
    Description: 
I am getting an exception when using the Flink State Processor API to write a new SavePoint, which is:
{code:java}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
    at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
    at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more
{code}
My java code:
{code:java}
@Override
 public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend,
 ParameterTool config) {
     String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
     int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
     Long windowTimeSize = config.getLong(EapSavepointConstants.WINDOW_TIME_SIZE);
     TumblingProcessingTimeWindows processTimeWindows =
         TumblingProcessingTimeWindows.of(Time.seconds(windowTimeSize));
     try {
         ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend);

         DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState(
         OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME,
         KafkaStateUtils.createTypeInformation(),
         KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
         logger.info("Print kafka offset");
         kafkaListState.print();
         Savepoint.create(stateBackend, maxParallelism)
             .withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation)
             .write(savepointOutputPath);
     } catch (IOException e) {
          logger.error("Savepoint load: " + e.getMessage());
          e.printStackTrace();
    } catch (Exception e) {
         logger.error("print state: " + e.getMessage());
         e.printStackTrace();
    }
 }

 

// KafkaStateUtils.java

public class KafkaStateUtils {
    /**
     * Creates state serializer for kafka topic partition to offset tuple.
     * Using of the explicit state serializer with KryoSerializer is needed because otherwise
     * users cannot use 'disableGenericTypes' properties with KafkaConsumer.
     * @param executionConfig
     * @return
     */
     public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer(
               ExecutionConfig executionConfig) {
         // explicit serializer will keep the compatibility with GenericTypeInformation
         // and allow to disableGenericTypes for users
         TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
                 new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
                 LongSerializer.INSTANCE
         };
        @SuppressWarnings("unchecked")
        Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
        return new TupleSerializer<>(tupleClass, fieldSerializers);
     }


     public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() {
         return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {});
     }
}

{code}
After remote debugging, it was found that the value of `org.apache.flink.state.api.output.TaggedOperatorSubtaskStated` could not be parsed in `org.apache.flink.util.LinkedOptionalMapSerializer#readOptionalMap`

 

Personally think that `TaggedOperatorSubtaskState` should implement `CompositeStateHandle`, please give some suggestions, thank you.

  was:
I am getting an exception when using the Flink State Processor API to write a new SavePoint, which is:

```
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
    at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
    at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more
```

My java code:

```

@Override
 public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend,
 ParameterTool config) {
 String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
 int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
 Long windowTimeSize = config.getLong(EapSavepointConstants.WINDOW_TIME_SIZE);
 TumblingProcessingTimeWindows processTimeWindows =
 TumblingProcessingTimeWindows.of(Time.seconds(windowTimeSize));
 try {
 ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend);

 DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState(
 OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME,
 KafkaStateUtils.createTypeInformation(),
 KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
 logger.info("Print kafka offset");
 kafkaListState.print();
 Savepoint.create(stateBackend, maxParallelism)
 .withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation)
 .write(savepointOutputPath);
 } catch (IOException e) {
 logger.error("Savepoint load: " + e.getMessage());
 e.printStackTrace();
 } catch (Exception e) {
 logger.error("print state: " + e.getMessage());
 e.printStackTrace();
 }
 }

 

// KafkaStateUtils.java

public class KafkaStateUtils {
 /**
* Creates state serializer for kafka topic partition to offset tuple.
 * Using of the explicit state serializer with KryoSerializer is needed because otherwise
 * users cannot use 'disableGenericTypes' properties with KafkaConsumer.
 * @param executionConfig
 * @return
 */
 public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer(
 ExecutionConfig executionConfig) {
 // explicit serializer will keep the compatibility with GenericTypeInformation
 // and allow to disableGenericTypes for users
 TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
 new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
 LongSerializer.INSTANCE
 };
 @SuppressWarnings("unchecked")
 Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
 return new TupleSerializer<>(tupleClass, fieldSerializers);
 }


 public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() {
 return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {});
 }
}

```

After remote debugging, it was found that the value of `org.apache.flink.state.api.output.TaggedOperatorSubtaskStated` could not be parsed in `org.apache.flink.util.LinkedOptionalMapSerializer#readOptionalMap`

 

Personally think that `TaggedOperatorSubtaskState` should implement CompositeStateHandle, please give some suggestions, thank you.


> TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21121
>                 URL: https://issues.apache.org/jira/browse/FLINK-21121
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.11.0
>            Reporter: binguo
>            Priority: Major
>
> I am getting an exception when using the Flink State Processor API to write a new SavePoint, which is:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options.
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
>     ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
>     at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
>     at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
>     at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>     ... 11 more
> Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
>     at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>     at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
>     at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>     at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
>     at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
>     at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
>     at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
>     at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
>     at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
>     at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
>     at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
>     at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
>     ... 15 more
> {code}
> My java code:
> {code:java}
> @Override
>  public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend,
>  ParameterTool config) {
>      String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
>      int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
>      Long windowTimeSize = config.getLong(EapSavepointConstants.WINDOW_TIME_SIZE);
>      TumblingProcessingTimeWindows processTimeWindows =
>          TumblingProcessingTimeWindows.of(Time.seconds(windowTimeSize));
>      try {
>          ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend);
>          DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState(
>          OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME,
>          KafkaStateUtils.createTypeInformation(),
>          KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
>          logger.info("Print kafka offset");
>          kafkaListState.print();
>          Savepoint.create(stateBackend, maxParallelism)
>              .withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation)
>              .write(savepointOutputPath);
>      } catch (IOException e) {
>           logger.error("Savepoint load: " + e.getMessage());
>           e.printStackTrace();
>     } catch (Exception e) {
>          logger.error("print state: " + e.getMessage());
>          e.printStackTrace();
>     }
>  }
>  
> // KafkaStateUtils.java
> public class KafkaStateUtils {
>     /**
>      * Creates state serializer for kafka topic partition to offset tuple.
>      * Using of the explicit state serializer with KryoSerializer is needed because otherwise
>      * users cannot use 'disableGenericTypes' properties with KafkaConsumer.
>      * @param executionConfig
>      * @return
>      */
>      public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer(
>                ExecutionConfig executionConfig) {
>          // explicit serializer will keep the compatibility with GenericTypeInformation
>          // and allow to disableGenericTypes for users
>          TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
>                  new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
>                  LongSerializer.INSTANCE
>          };
>         @SuppressWarnings("unchecked")
>         Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
>         return new TupleSerializer<>(tupleClass, fieldSerializers);
>      }
>      public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() {
>          return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {});
>      }
> }
> {code}
> After remote debugging, it was found that the value of `org.apache.flink.state.api.output.TaggedOperatorSubtaskStated` could not be parsed in `org.apache.flink.util.LinkedOptionalMapSerializer#readOptionalMap`
>  
> Personally think that `TaggedOperatorSubtaskState` should implement `CompositeStateHandle`, please give some suggestions, thank you.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)