You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/12/16 10:36:00 UTC

[jira] [Assigned] (FLINK-30278) Unexpected config mutation in SinkTransformationTranslator

     [ https://issues.apache.org/jira/browse/FLINK-30278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martijn Visser reassigned FLINK-30278:
--------------------------------------

    Assignee: Piotr Nowojski

> Unexpected config mutation in SinkTransformationTranslator 
> -----------------------------------------------------------
>
>                 Key: FLINK-30278
>                 URL: https://issues.apache.org/jira/browse/FLINK-30278
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Configuration
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Anton Kalashnikov
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>
> If we forbid changing configuration programmatically(`execution.program-config.enabled`) and try to use `FileSink`. The following exception will occur:
> {noformat}
>   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not allowed configuration change(s) were detected:
>  - Configuration parallelism.default:1 not allowed in the configuration object ExecutionConfig.
>    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364)
>    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:225)
>    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
>    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:319)
>    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:262)
>    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
>    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
>    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
>    at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
> Caused by: org.apache.flink.client.program.MutatedConfigurationException: Not allowed configuration change(s) were detected:
>  - Configuration parallelism.default:1 not allowed in the configuration object ExecutionConfig.
>    at org.apache.flink.client.program.StreamContextEnvironment.checkNotAllowedConfigurations(StreamContextEnvironment.java:235)
>    at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:175)
>    at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:115)
>    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2049)
>    at org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:81)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347)
>    ... 16 more
> {noformat}
> It happens since inside of `SinkTransformationTranslator` we have following logic:
> * Remeber the current parallelism
> * Set parallelism to default
> * Do transformation
> * Set parallelism to remembered one
> But if the initial prallelism is default we actually should do nothing but according current logic we explicitly set default value to the configuration which actually is the programmatic config mutation(which we want to avoid)
> See org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator.SinkExpander#executionEnvironment:341 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)