You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2022/03/17 18:36:28 UTC

Correct way to cleanly shut down StateFun Harness in test code

Hi all,

I’m using org.apache.flink.statefun.flink.harness.Harness in some unit test code, where I control the sources so that they are finite.

This is similar to what I found at https://stackoverflow.com/questions/61939681/is-it-possible-to-write-a-unit-test-which-terminates-using-flink-statefun-harnes

The problem is that if I shut down all of the sources, it looks like StateFun then starts shutting down some resources prematurely, which results in this error:

Caused by: java.lang.IllegalStateException: Mailbox is in state CLOSED, but is required to be in state OPEN for put operations.
	at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:256) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:184) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:73) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.api.operators.MailboxExecutor.execute(MailboxExecutor.java:98) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade.execute(MailboxExecutorFacade.java:35) ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
	at org.apache.flink.statefun.flink.core.feedback.FeedbackChannel$ConsumerTask.scheduleDrainAll(FeedbackChannel.java:114) ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
	at org.apache.flink.statefun.flink.core.feedback.FeedbackChannel.put(FeedbackChannel.java:64) ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
	at org.apache.flink.statefun.flink.core.feedback.FeedbackSinkOperator.processElement(FeedbackSinkOperator.java:58) ~[statefun-flink-datastream-2.2.2.jar:2.2.2]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:158) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:191) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:162) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:374) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:190) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574) ~[flink-streaming-java_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) ~[flink-runtime_2.12-1.11.4.jar:1.11.4]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) ~[flink-runtime_2.12-1.11.4.jar:1.11.4]
	at java.lang.Thread.run(Thread.java:834) ~[?:?]

I believe the issue is that messages are still being generated but the target for the feedback channel has been terminated.

I currently work around this by turning off just the sources that would trigger new egress results, then I wait for the egresses to all be idle for > 2 seconds, and then I shut down the rest of the sources.

But it feels fragile...

Thanks,

— Ken

PS - using StateFun 2.2.2

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch