You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "jirawech.s (Jira)" <ji...@apache.org> on 2023/04/02 16:10:00 UTC
[jira] [Created] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed
jirawech.s created FLINK-31689:
----------------------------------
Summary: Filesystem sink fails when parallelism of compactor operator changed
Key: FLINK-31689
URL: https://issues.apache.org/jira/browse/FLINK-31689
Project: Flink
Issue Type: Bug
Components: Connectors / FileSystem
Affects Versions: 1.16.1
Reporter: jirawech.s
Attachments: HelloFlinkHadoopSink.java
I encounter this error when i tried to use Filesystem sink with Table SQL. I have not tested with Datastream API tho. You may refers to the error as below
{code:java}
// code placeholder
java.util.NoSuchElementException
at java.util.ArrayList$Itr.next(ArrayList.java:864)
at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750) {code}
I cannot attach the full reproducible code here, but you may follow my pseudo code in attachment and reproducible steps below
1. Create Kafka source
2. Set state.savepoints.dir
3. Set Job parallelism to 1
4. Create FileSystem Sink
5. Run the job and trigger savepoint with API
{noformat}
curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": false}'{noformat}
{color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from savepoint{color}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)