You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "boshu Zheng (JIRA)" <ji...@apache.org> on 2018/12/06 06:22:00 UTC
[jira] [Created] (FLINK-11083) CRowSerializerConfigSnapshot is not
instantiable
boshu Zheng created FLINK-11083:
-----------------------------------
Summary: CRowSerializerConfigSnapshot is not instantiable
Key: FLINK-11083
URL: https://issues.apache.org/jira/browse/FLINK-11083
Project: Flink
Issue Type: Bug
Components: Table API & SQL, Type Serialization System
Reporter: boshu Zheng
Assignee: boshu Zheng
An exception was encountered when restarting a job with savepoint in our production env,
{code:java}
2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
... 5 more
Caused by: java.lang.RuntimeException: The class 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' is not instantiable: The class has no (implicit) public nullary constructor, i.e. a constructor without arguments.
at org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more
{code}
I add tests to CRowSerializerTest to make sure this is definitely a bug,
{code:java}
@Test
def testDefaultConstructor(): Unit = {
new CRowSerializer.CRowSerializerConfigSnapshot()
/////// This would fail the test
val serializerConfigSnapshotClass =
Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
InstantiationUtil.instantiate(serializerConfigSnapshotClass)
}
@Test
def testStateRestore(): Unit = {
class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, Integer] {
var state: ListState[CRow] = _
override def open(parameters: Configuration): Unit = {
val stateDesc = new ListStateDescriptor[CRow]("CRow",
new CRowTypeInfo(new RowTypeInfo(Types.INT)))
state = getRuntimeContext.getListState(stateDesc)
}
override def processElement(value: Integer,
ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context,
out: Collector[Integer]): Unit = {
state.add(new CRow(Row.of(value), true))
}
}
val operator = new KeyedProcessOperator[Integer, Integer, Integer](new IKeyedProcessFunction)
var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer](
operator,
new KeySelector[Integer, Integer] {
override def getKey(value: Integer): Integer= -1
},
Types.INT, 1, 1, 0)
testHarness.setup()
testHarness.open()
testHarness.processElement(new StreamRecord[Integer](1, 1L))
testHarness.processElement(new StreamRecord[Integer](2, 1L))
testHarness.processElement(new StreamRecord[Integer](3, 1L))
assertEquals(1, numKeyedStateEntries(operator))
val snapshot = testHarness.snapshot(0L, 0L)
testHarness.close()
testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer](
operator,
new KeySelector[Integer, Integer] {
override def getKey(value: Integer): Integer= -1
},
Types.INT, 1, 1, 0)
testHarness.setup()
/////// This would throw the same exception as our production app do.
testHarness.initializeState(snapshot)
testHarness.open()
assertEquals(1, numKeyedStateEntries(operator))
testHarness.close()
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)