You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Diomedes Tydeus <di...@gmail.com> on 2018/08/06 21:42:25 UTC
flink 1.4.2 NPE after job startup when using managed ValueState
Dear Flink Users,
I have two environments, one old, one new. I'm trying to migrate my flink
job. The exact same application code that runs in one environment, fails
in the second with a very confusing NPE (listed below). I can garuntee the
application code only calls ValueState.get() inside of a keyed context (and
works perfectly in my other environment). I get the same error when I try
with rocksDB other than the stack trace changes just a little bit. I
jumped into the source code, but I sort of lose it when
InternalKeyedContext is passed via the constructor (I'm not sure where to
jump to from there).
Other than the obvious (keyed context) why else might I get this error?
(application code follows exception)
java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context.
at
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
at
com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:30)
at
com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:12)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Application code---
override fun open(config: Configuration) {
val descriptor = ValueStateDescriptor <HashMap<Long,
ArrayList<Long>>>(dataName, TypeInformation.of(object :
TypeHint<HashMap<Long, ArrayList<Long>>>() {}))
descriptor.setQueryable(dataName)
coverageData = runtimeContext.getState(descriptor)
}
override fun flatMap(value: CoverageWithPartitionInfo, out:
Collector<String>) {
var currentCoverage = coverageData.value()
--- this is the line that blows up on one environment but not others ^^ ---