You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "vinoyang (Jira)" <ji...@apache.org> on 2019/10/17 07:59:00 UTC

[jira] [Created] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state in both processElement and onTimer method

vinoyang created FLINK-14428:
--------------------------------

             Summary: Non-consistency key access in KeyedProcessFunction when use keyed state in both processElement and onTimer method
                 Key: FLINK-14428
                 URL: https://issues.apache.org/jira/browse/FLINK-14428
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
            Reporter: vinoyang


Scenario:

In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} and {{onTimer}} method may cause non-consistency key access.

Analysis:

For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key which comes from timer when calling registerXXXTimeTimer:


{code:java}
public void onProcessingTime(long time) throws Exception {
		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
		// inside the callback.
		nextTimer = null;

		InternalTimer<K, N> timer;

		while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
			processingTimeTimersQueue.poll();
			keyContext.setCurrentKey(timer.getKey());        //here
			triggerTarget.onProcessingTime(timer);
		}

		if (timer != null && nextTimer == null) {
			nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
		}
	}
{code}

For processElement method, in {{OneInputStreamTask}} it is called after seting key context:


{code:java}
		@Override
		public void emitRecord(StreamRecord<IN> record) throws Exception {
			synchronized (lock) {
				numRecordsIn.inc();
				operator.setKeyContextElement1(record);        //here
				operator.processElement(record);
			}
		}
{code}

The setCurrentKey method in the first code snippet and the setKeyContextElement1 method in the second code snippet are point to the same {{AbstractStreamOperator#setCurrentKey}} method. However, there is only one keyed State Backend instance. And {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed state backend.

So if we access keyed state API in both {{processElement}} and {{onTimer}}, we may get error state value, because one of these methods may change the key and caused non-consistency problem.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)