You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tang Yan (Jira)" <ji...@apache.org> on 2021/01/19 08:25:00 UTC

[jira] [Updated] (FLINK-21023) Task Manager uses the container dir of Job Manager when running flink job on yarn-cluster.

     [ https://issues.apache.org/jira/browse/FLINK-21023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tang Yan updated FLINK-21023:
-----------------------------
    Description: 
I want to try to use option  -yt(yarnship) to distribute my config files to Yarn cluster, and read the file in code. I just used the flink example wordcount.

Here is my submit command:

/opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c org.apache.flink.examples.java.wordcount.WordCount /opt/Flink/examples/batch/WordCount.jar --input conf/cmp_online.cfg

Test Result:

I found that if the job manager and task manager are lunched on the same node, the job can run successfully. But when they're running on different node, the job will fail in the below ERRORs. I find the conf folder has been distributed to container cache dirs, such as [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf] on job manager node, and [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf] on task manager node. But why the task manager loads the conf file from the container_eXXX_000001 path (which is located on job manager node)?

_2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with ResourceID container_e283_1609125504851_3620_01_000002 (akka.tcp://flink@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at ResourceManager 2021-01-19 04:19:11,506 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (attempt #0) to container_e283_1609125504851_3620_01_000002 @ rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. java.io.IOException: Error opening the Input Split [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg] [0,71]: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_272] at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996) ~[flink-dist_2.11-1.11.1.jar:1.11.1]_

  was:
I want to try to use option  -yt(yarnship) to distribute my config files to Yarn cluster, and read the file in code. I just used the flink example wordcount.

Here is my submit command:

/opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c org.apache.flink.examples.java.wordcount.WordCount /opt/Flink/examples/batch/WordCount.jar --input conf/cmp_online.cfg

Test Result:

I found the if the job manager and task manager are lunched on the same node, the job can run successfully. But when they're running on different node, the job will fail in the below ERRORs. I find the conf folder has been distributed to container cache dirs, such as file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf on job manager node, and file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf on task manager node. But why the task manager loads the conf file from the container_eXXX_000001 path (which is located on job manager node)?

_2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with ResourceID container_e283_1609125504851_3620_01_000002 (akka.tcp://flink@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at ResourceManager 2021-01-19 04:19:11,506 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (attempt #0) to container_e283_1609125504851_3620_01_000002 @ rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. java.io.IOException: Error opening the Input Split file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg [0,71]: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_272] at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996) ~[flink-dist_2.11-1.11.1.jar:1.11.1]_


> Task Manager uses the container dir of Job Manager when running flink job on yarn-cluster.
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21023
>                 URL: https://issues.apache.org/jira/browse/FLINK-21023
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.12.0, 1.11.1
>            Reporter: Tang Yan
>            Priority: Critical
>
> I want to try to use option  -yt(yarnship) to distribute my config files to Yarn cluster, and read the file in code. I just used the flink example wordcount.
> Here is my submit command:
> /opt/Flink/bin/flink run -m yarn-cluster -p 1 -yt /path/to/conf -c org.apache.flink.examples.java.wordcount.WordCount /opt/Flink/examples/batch/WordCount.jar --input conf/cmp_online.cfg
> Test Result:
> I found that if the job manager and task manager are lunched on the same node, the job can run successfully. But when they're running on different node, the job will fail in the below ERRORs. I find the conf folder has been distributed to container cache dirs, such as [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf] on job manager node, and [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000002/conf] on task manager node. But why the task manager loads the conf file from the container_eXXX_000001 path (which is located on job manager node)?
> _2021-01-19 04:19:11,405 INFO org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with ResourceID container_e283_1609125504851_3620_01_000002 (akka.tcp://flink@rphf1hsn026.qa.webex.com:46785/user/rpc/taskmanager_0) at ResourceManager 2021-01-19 04:19:11,506 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from SCHEDULED to DEPLOYING. 2021-01-19 04:19:11,507 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (attempt #0) to container_e283_1609125504851_3620_01_000002 @ rphf1hsn026.qa.webex.com (dataPort=46647) 2021-01-19 04:19:11,608 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from DEPLOYING to RUNNING. 2021-01-19 04:19:11,792 INFO org.apache.flink.api.common.io.LocatableInputSplitAssigner [] - Assigning remote split to host rphf1hsn026 2021-01-19 04:19:11,847 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - CHAIN DataSource (at main(WordCount.java:69) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCount.java:84)) -> Combine (SUM(1), at main(WordCount.java:87) (1/1) (10cf614584617de770c6fd0f0aad4db7) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@3e19cc76. java.io.IOException: Error opening the Input Split [file:/data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg|file:///data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg] [0,71]: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:824) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:470) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_272] Caused by: java.io.FileNotFoundException: /data/d1/yarn/nm/usercache/yanta/appcache/application_1609125504851_3620/container_e283_1609125504851_3620_01_000001/conf/cmp_online.cfg (No such file or directory) at java.io.FileInputStream.open0(Native Method) ~[?:1.8.0_272] at java.io.FileInputStream.open(FileInputStream.java:195) ~[?:1.8.0_272] at java.io.FileInputStream.<init>(FileInputStream.java:138) ~[?:1.8.0_272] at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:996) ~[flink-dist_2.11-1.11.1.jar:1.11.1]_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)