You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tang Yan (Jira)" <ji...@apache.org> on 2021/01/20 12:53:01 UTC

[jira] [Comment Edited] (FLINK-21023) Task Manager uses the container dir of Job Manager when running flink job on yarn-cluster.

    [ https://issues.apache.org/jira/browse/FLINK-21023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268555#comment-17268555 ] 

Tang Yan edited comment on FLINK-21023 at 1/20/21, 12:52 PM:
-------------------------------------------------------------

[~trohrmann] 

Thanks for your reply. Wordcount is only my simple test. In my case, I just want to wrap some configuration into a XX.conf file and flink code to read from the file to get the key-value map. I used to use '--files ' in spark job for such case. So -yt in flink job can't support such case?

env.readTextFile(params.get("input")) This is the only line in my code.


was (Author: tang yan):
[~trohrmann] 

Thanks for your reply. Wordcount is only my simple test. In my case, I just want to wrap some configuration into a XX.conf file and flink code to read from the file to get the key-value map. I used to use '--files ' in spark job for such case. So -yt in flink job can't support such case?

> Task Manager uses the container dir of Job Manager when running flink job on yarn-cluster.
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21023
>                 URL: https://issues.apache.org/jira/browse/FLINK-21023
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission, Deployment / YARN
>    Affects Versions: 1.12.0, 1.11.1
>            Reporter: Tang Yan
>            Priority: Major
>
> I want to try to use option  -yt(yarnship) to distribute my config files to Yarn cluster, and read the file in code. I just used the flink example wordcount.
> Here is my submit command:
> /opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c org.apache.flink.examples.java.wordcount.WordCount /opt/Flink/examples/batch/WordCount.jar --input ./conf/cmp_online.cfg
> Test Result:
> I found that if the job manager and task manager are lunched on the same node, the job can run successfully. But when they're running on different node, the job will fail in the below ERRORs. I find the conf folder has been distributed to container cache dirs, such as [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf] on job manager node, and [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf] on task manager node. But why the task manager loads the conf file from the container_eXXX_000001 path (which is located on job manager node)?
> _2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with ResourceID container_e283_1609125504851_3620_01_000002 (akka.tcp://flink@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at ResourceManager 2021-01-19 04:19:11,506 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (attempt #0) to container_e283_1609125504851_3620_01_000002 @ rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. java.io.IOException: Error opening the Input Split [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg] [0,71]: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_272] at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996) ~[flink-dist_2.11-1.11.1.jar:1.11.1]_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)