You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Diomedes Tydeus <di...@gmail.com> on 2018/08/06 21:42:25 UTC

flink 1.4.2 NPE after job startup when using managed ValueState

Dear Flink Users,
I have two environments, one old, one new.  I'm trying to migrate my flink
job.  The exact same application code that runs in one environment, fails
in the second with a very confusing NPE (listed below).  I can garuntee the
application code only calls ValueState.get() inside of a keyed context (and
works perfectly in my other environment).  I get the same error when I try
with rocksDB other than the stack trace changes just a little bit.  I
jumped into the source code, but I sort of lose it when
InternalKeyedContext is passed via the constructor (I'm not sure where to
jump to from there).

Other than the obvious (keyed context) why else might I get this error?
(application code follows exception)

java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context.
    at
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
    at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)
    at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)
    at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)
    at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
    at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
    at
com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:30)
    at
com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:12)
    at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
    at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
    at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)


Application code---

    override fun open(config: Configuration) {
        val descriptor = ValueStateDescriptor <HashMap<Long,
ArrayList<Long>>>(dataName, TypeInformation.of(object :
TypeHint<HashMap<Long, ArrayList<Long>>>() {}))
        descriptor.setQueryable(dataName)
        coverageData = runtimeContext.getState(descriptor)
    }

    override fun flatMap(value: CoverageWithPartitionInfo, out:
Collector<String>) {

        var currentCoverage = coverageData.value()
---  this is the line that blows up on one environment but not others ^^ ---