You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Junrui Li (Jira)" <ji...@apache.org> on 2024/01/17 11:42:00 UTC

[jira] [Comment Edited] (FLINK-34132) Batch WordCount job fails when run with AdaptiveBatch scheduler

    [ https://issues.apache.org/jira/browse/FLINK-34132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807701#comment-17807701 ] 

Junrui Li edited comment on FLINK-34132 at 1/17/24 11:41 AM:
-------------------------------------------------------------

[~prabhujoseph] The [limitation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations-2] of adaptive batch scheduler requires that only supports jobs whose shuffle mode is {{{}ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE{}}}.

However, this config option execution.batch-shuffle-mode is applicable to DataStream jobs but not to DataSet jobs. For DataSet jobs, ExecutionMode.BATCH_FORCED could be considered a workaround (not very certain). 

But I think the doc on the website is not very clear on this limitation and could potentially confuse users. It's better to update the documentation to clarify this point. 

[~wanglijie] , WDYT?
 


was (Author: JIRAUSER293716):
[~prabhujoseph] The [limitation|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations-2] of adaptive batch scheduler requires that only supports jobs whose shuffle mode is {{{}ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE{}}}.

However, this config option execution.batch-shuffle-mode is applicable to DataStream jobs but not to DataSet jobs. For DataSet jobs, ExecutionMode.BATCH_FORCED could be considered a workaround. 

But I think the doc on the website is not very clear on this limitation and could potentially confuse users. It's better to update the documentation to clarify this point.

[~wanglijie] , WDYT?
 

> Batch WordCount job fails when run with AdaptiveBatch scheduler
> ---------------------------------------------------------------
>
>                 Key: FLINK-34132
>                 URL: https://issues.apache.org/jira/browse/FLINK-34132
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.17.1, 1.18.1
>            Reporter: Prabhu Joseph
>            Priority: Major
>
> Batch WordCount job fails when run with AdaptiveBatch scheduler.
> *Repro Steps*
> {code:java}
> flink-yarn-session -Djobmanager.scheduler=adaptive -d
>  flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
> {code}
> *Error logs*
> {code:java}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> 	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
> 	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
> 	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
> 	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
> 	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> 	at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
> 	at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
> 	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
> 	at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> 	... 12 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
> 	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 	at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
> 	... 20 more
> Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
> 	at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
> 	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
> 	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> 	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> 	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
> 	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> 	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> 	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> 	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
> 	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:750)
> Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
> 	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> 	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> 	... 3 more
> Caused by: java.lang.IllegalStateException: At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
> 	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> 	at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
> 	at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
> 	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
> 	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
> 	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
> 	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
> 	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> 	... 3 more
> {code}
> *Analysis*
> I have configured the execution.batch-shuffle-mode to use either ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, or ALL_EXCHANGES_HYBRID_SELECTIVE, but all attempts resulted in the same error message.
> The Wordcount program runs fine when setting below in the code
> {code:java}
> env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
> {code}
> Need to investigate why the execution.batch-shuffle-mode is not being recognized and, if this behavior is intentional, correct the reported misleading error message. Additionally, we need to address the Wordcount job to ensure it runs seamlessly with both batch and adaptive scheduler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)