You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "binguo (Jira)" <ji...@apache.org> on 2021/01/25 09:27:00 UTC
[jira] [Updated] (FLINK-21121) TaggedOperatorSubtaskState is
missing when creating a new savepoint using state processor api
[ https://issues.apache.org/jira/browse/FLINK-21121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
binguo updated FLINK-21121:
---------------------------
Description:
I am getting an exception when using the Flink State Processor API to write a new SavePoint, which is:
{code:java}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more
{code}
My java code:
{code:java}
@Override
public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend,
ParameterTool config) {
String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
Long windowTimeSize = config.getLong(EapSavepointConstants.WINDOW_TIME_SIZE);
TumblingProcessingTimeWindows processTimeWindows =
TumblingProcessingTimeWindows.of(Time.seconds(windowTimeSize));
try {
ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend);
DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState(
OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME,
KafkaStateUtils.createTypeInformation(),
KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
logger.info("Print kafka offset");
kafkaListState.print();
Savepoint.create(stateBackend, maxParallelism)
.withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation)
.write(savepointOutputPath);
} catch (IOException e) {
logger.error("Savepoint load: " + e.getMessage());
e.printStackTrace();
} catch (Exception e) {
logger.error("print state: " + e.getMessage());
e.printStackTrace();
}
}
// KafkaStateUtils.java
public class KafkaStateUtils {
/**
* Creates state serializer for kafka topic partition to offset tuple.
* Using of the explicit state serializer with KryoSerializer is needed because otherwise
* users cannot use 'disableGenericTypes' properties with KafkaConsumer.
* @param executionConfig
* @return
*/
public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer(
ExecutionConfig executionConfig) {
// explicit serializer will keep the compatibility with GenericTypeInformation
// and allow to disableGenericTypes for users
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
LongSerializer.INSTANCE
};
@SuppressWarnings("unchecked")
Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
return new TupleSerializer<>(tupleClass, fieldSerializers);
}
public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() {
return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {});
}
}
{code}
After remote debugging, it was found that the value of `org.apache.flink.state.api.output.TaggedOperatorSubtaskStated` could not be parsed in `org.apache.flink.util.LinkedOptionalMapSerializer#readOptionalMap`
Personally think that `TaggedOperatorSubtaskState` should implement `CompositeStateHandle`, please give some suggestions, thank you.
was:
I am getting an exception when using the Flink State Processor API to write a new SavePoint, which is:
```
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more
```
My java code:
```
@Override
public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend,
ParameterTool config) {
String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
Long windowTimeSize = config.getLong(EapSavepointConstants.WINDOW_TIME_SIZE);
TumblingProcessingTimeWindows processTimeWindows =
TumblingProcessingTimeWindows.of(Time.seconds(windowTimeSize));
try {
ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend);
DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState(
OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME,
KafkaStateUtils.createTypeInformation(),
KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
logger.info("Print kafka offset");
kafkaListState.print();
Savepoint.create(stateBackend, maxParallelism)
.withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation)
.write(savepointOutputPath);
} catch (IOException e) {
logger.error("Savepoint load: " + e.getMessage());
e.printStackTrace();
} catch (Exception e) {
logger.error("print state: " + e.getMessage());
e.printStackTrace();
}
}
// KafkaStateUtils.java
public class KafkaStateUtils {
/**
* Creates state serializer for kafka topic partition to offset tuple.
* Using of the explicit state serializer with KryoSerializer is needed because otherwise
* users cannot use 'disableGenericTypes' properties with KafkaConsumer.
* @param executionConfig
* @return
*/
public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer(
ExecutionConfig executionConfig) {
// explicit serializer will keep the compatibility with GenericTypeInformation
// and allow to disableGenericTypes for users
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
LongSerializer.INSTANCE
};
@SuppressWarnings("unchecked")
Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
return new TupleSerializer<>(tupleClass, fieldSerializers);
}
public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() {
return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {});
}
}
```
After remote debugging, it was found that the value of `org.apache.flink.state.api.output.TaggedOperatorSubtaskStated` could not be parsed in `org.apache.flink.util.LinkedOptionalMapSerializer#readOptionalMap`
Personally think that `TaggedOperatorSubtaskState` should implement CompositeStateHandle, please give some suggestions, thank you.
> TaggedOperatorSubtaskState is missing when creating a new savepoint using state processor api
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-21121
> URL: https://issues.apache.org/jira/browse/FLINK-21121
> Project: Flink
> Issue Type: Bug
> Components: API / State Processor
> Affects Versions: 1.11.0
> Reporter: binguo
> Priority: Major
>
> I am getting an exception when using the Flink State Processor API to write a new SavePoint, which is:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
> at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
> at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
> at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
> at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
> at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
> at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
> at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
> at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
> at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
> ... 15 more
> {code}
> My java code:
> {code:java}
> @Override
> public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, StateBackend stateBackend,
> ParameterTool config) {
> String savepointOutputPath = config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
> int maxParallelism = config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
> Long windowTimeSize = config.getLong(EapSavepointConstants.WINDOW_TIME_SIZE);
> TumblingProcessingTimeWindows processTimeWindows =
> TumblingProcessingTimeWindows.of(Time.seconds(windowTimeSize));
> try {
> ExistingSavepoint existingSavepoint = Savepoint.load(env, savepointPath, stateBackend);
> DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = existingSavepoint.readUnionState(
> OperatorUidAndNameConstants.KAFKA_SOURCE_UID, StateNameConstants.KAFKA_OFFSET_STATE_NAME,
> KafkaStateUtils.createTypeInformation(),
> KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
> logger.info("Print kafka offset");
> kafkaListState.print();
> Savepoint.create(stateBackend, maxParallelism)
> .withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, kafkaTransformation)
> .write(savepointOutputPath);
> } catch (IOException e) {
> logger.error("Savepoint load: " + e.getMessage());
> e.printStackTrace();
> } catch (Exception e) {
> logger.error("print state: " + e.getMessage());
> e.printStackTrace();
> }
> }
>
> // KafkaStateUtils.java
> public class KafkaStateUtils {
> /**
> * Creates state serializer for kafka topic partition to offset tuple.
> * Using of the explicit state serializer with KryoSerializer is needed because otherwise
> * users cannot use 'disableGenericTypes' properties with KafkaConsumer.
> * @param executionConfig
> * @return
> */
> public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateDescriptorSerializer(
> ExecutionConfig executionConfig) {
> // explicit serializer will keep the compatibility with GenericTypeInformation
> // and allow to disableGenericTypes for users
> TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
> new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
> LongSerializer.INSTANCE
> };
> @SuppressWarnings("unchecked")
> Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
> return new TupleSerializer<>(tupleClass, fieldSerializers);
> }
> public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> createTypeInformation() {
> return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {});
> }
> }
> {code}
> After remote debugging, it was found that the value of `org.apache.flink.state.api.output.TaggedOperatorSubtaskStated` could not be parsed in `org.apache.flink.util.LinkedOptionalMapSerializer#readOptionalMap`
>
> Personally think that `TaggedOperatorSubtaskState` should implement `CompositeStateHandle`, please give some suggestions, thank you.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)