You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "jirawech.s (Jira)" <ji...@apache.org> on 2023/04/02 16:10:00 UTC

[jira] [Created] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

jirawech.s created FLINK-31689:
----------------------------------

             Summary: Filesystem sink fails when parallelism of compactor operator changed
                 Key: FLINK-31689
                 URL: https://issues.apache.org/jira/browse/FLINK-31689
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.16.1
            Reporter: jirawech.s
         Attachments: HelloFlinkHadoopSink.java

I encounter this error when i tried to use Filesystem sink with Table SQL. I have not tested with Datastream API tho. You may refers to the error as below
{code:java}
// code placeholder
java.util.NoSuchElementException
	at java.util.ArrayList$Itr.next(ArrayList.java:864)
	at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:750) {code}
I cannot attach the full reproducible code here, but you may follow my pseudo code in attachment and reproducible steps below
1. Create Kafka source

2. Set state.savepoints.dir

3. Set Job parallelism to 1

4. Create FileSystem Sink

5. Run the job and trigger savepoint with API
{noformat}
curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": false}'{noformat}
{color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)