You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "luoyuxia (Jira)" <ji...@apache.org> on 2023/04/04 01:49:00 UTC

[jira] [Comment Edited] (FLINK-31689) Filesystem sink fails when parallelism of compactor operator changed

    [ https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708187#comment-17708187 ] 

luoyuxia edited comment on FLINK-31689 at 4/4/23 1:48 AM:
----------------------------------------------------------

It's in [here|https://github.com/apache/flink/blob/0915c9850d861165e283acc0f60545cd836f0567/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114].

I think the these code line :
{code:java}
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next()); {code}
, we can check where this.expiredFilesState.get().iterator().hasNext(), and then put. 

I think you can modify this code line, build file-connector, and then try it again to see whether it works.


was (Author: luoyuxia):
It's in [here|https://github.com/apache/flink/blob/0915c9850d861165e283acc0f60545cd836f0567/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114].

I think the these code line :
{code:java}
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next()); {code}
, we can check where this.expiredFilesState.get().iterator().hasNext(), and then put. 

> Filesystem sink fails when parallelism of compactor operator changed
> --------------------------------------------------------------------
>
>                 Key: FLINK-31689
>                 URL: https://issues.apache.org/jira/browse/FLINK-31689
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.16.1
>            Reporter: jirawech.s
>            Priority: Major
>         Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
> 	at java.util.ArrayList$Itr.next(ArrayList.java:864)
> 	at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
> 	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
> 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> 	at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)