You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by spoon_lz <97...@qq.com> on 2018/08/15 02:58:15 UTC

How to submit flink job on yarn by java code

My project is to automatically generate flink's code jar and then submit it
to yarn cluster for execution and get the ApplicationId. I find that after
execution, an error will be reported



Then I searched for the error on Google and found that the reason for the
error was that I did not introduce the haoop environment variable.
But my jar submission is not called./bin/flink script originally submitted,
but use the CliFrontend.java ,How to solve this problem?

My code like :





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to submit flink job on yarn by java code

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Is this path accessible on the container? If not, use some distributed file system, nfs or -yt —yarnship option of the cli.

Please also take a look at https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2nw@mail.gmail.com%3E <https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2nw@mail.gmail.com%3E>

Piotrek

> On 16 Aug 2018, at 11:05, spoon_lz <97...@qq.com> wrote:
> 
> Sorry, I don't know why the code and error are not visible.
> The error is :
> The program finished with the following exception:
> 
> /org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
> 	at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
> 	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
> 	at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
> 	at flink.SubmitDemo.submit(SubmitDemo.java:75)
> 	at flink.SubmitDemo.main(SubmitDemo.java:50)
> Caused by:
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
> The YARN application unexpectedly switched to state FAILED during
> deployment. 
> Diagnostics from YARN: Application application_1526888270443_0090 failed 2
> times due to AM Container for appattempt_1526888270443_0090_000002 exited
> with  exitCode: -1000
> For more detailed output, check application tracking
> page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
> click on links to logs of each attempt.
> Diagnostics: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
> java.io.FileNotFoundException: File
> file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> does not exist
> 	at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
> 	at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
> 	at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
> 	at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
> 	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
> 	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
> 	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
> 	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
> 	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
> 	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> Failing this attempt. Failing the application.
> If log aggregation is enabled on your cluster, use this command to further
> investigate the issue:
> yarn logs -applicationId application_1526888270443_0090
> 	at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
> 	at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
> 	at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
> 	... 5 more/
> 
> and my code like :
> 
> /public class SubmitDemo {
> 
> 
>    private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
>    private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
>    private static final String JAR_FILE =
> "/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";
> 
> 
>    public static void main(String[] args) {
> 
>        SubmitDemo demo = new SubmitDemo();
>        demo.before();
>        List<String> parameters = new ArrayList<>();
>        parameters.add("run");
>        parameters.add("-d");
>        parameters.add("-m");
>        parameters.add("yarn-cluster");
>        parameters.add("-ynm");
>        parameters.add("lz_test_alone");
>        parameters.add("-yn");
>        parameters.add("4");
>        parameters.add("-ytm");
>        parameters.add("4096");
>        parameters.add("-yjm");
>        parameters.add("1024");
>        parameters.add("-c");
>        parameters.add("flink.Demo");
>        parameters.add(JAR_FILE);
> 
>        try {
>            demo.submit(parameters.toArray(new String[parameters.size()]));
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>    }
> 
>    public void submit(String[] args) throws Exception {
> 
>        final String configurationDirectory = ENV_CONF;
> 
>        File configFIle = new File(FLINK_CONF);
> 
>        final Configuration flinkConfiguration =
> GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());
> 
>        FlinkYarnSessionCli cli = new
> FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
> "yarn");
> 
>        final List<CustomCommandLine&lt;?>> customCommandLines =
> CliFrontend.loadCustomCommandLines(
>                flinkConfiguration,
>                configurationDirectory);
> 
>        CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
> customCommandLines);
>        //submit
>        testFrontend.parseParameters(args);
>        CommandLine commandLine = CliFrontendParser.parse(
>                CliFrontendParser.getRunCommandOptions(),
>                args,
>                true);
>        final ApplicationId clusterId = cli.getClusterId(commandLine);
>        System.out.println("ApplicationId=" + clusterId.toString());
>    }
> 
>    // SET HADOOP ENV 
>    private void before() {
>        Map<String, String> newenv = Maps.newHashMap();
>        newenv.put("HADOOP_CONF_DIR", ENV_CONF);
>        newenv.put("YARN_CONF_DIR", ENV_CONF);
>           try {
>            Class<?> processEnvironmentClass =
> Class.forName("java.lang.ProcessEnvironment");
>            Field theEnvironmentField =
> processEnvironmentClass.getDeclaredField("theEnvironment");
>            theEnvironmentField.setAccessible(true);
>            Map<String, String> env = (Map<String, String>)
> theEnvironmentField.get(null);
>            env.putAll(newenv);
>            Field theCaseInsensitiveEnvironmentField =
> processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
>            theCaseInsensitiveEnvironmentField.setAccessible(true);
>            Map<String, String> cienv = (Map<String, String>)
> theCaseInsensitiveEnvironmentField.get(null);
>            cienv.putAll(newenv);
>      } catch (NoSuchFieldException e) {
>            Class[] classes = Collections.class.getDeclaredClasses();
>            Map<String, String> env = System.getenv();
>            for (Class cl : classes) {
>                if
> ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
>                    Field field = cl.getDeclaredField("m");
>                    field.setAccessible(true);
>                    Object obj = field.get(env);
>                    Map<String, String> map = (Map<String, String>) obj;
>                    map.clear();
>                    map.putAll(newenv);
>                }
>            }
>        }
>    }
> 
> 
> }/
> 
> 
> the error  is file not found
> "/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
> "
> but I can foud this file .
> Previously, I thought it was an environment variable problem and added "
> before() ". This method still reported an error
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to submit flink job on yarn by java code

Posted by spoon_lz <97...@qq.com>.
Sorry, I don't know why the code and error are not visible.
The error is :
 The program finished with the following exception:

/org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
	at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
	at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
	at flink.SubmitDemo.submit(SubmitDemo.java:75)
	at flink.SubmitDemo.main(SubmitDemo.java:50)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment. 
Diagnostics from YARN: Application application_1526888270443_0090 failed 2
times due to AM Container for appattempt_1526888270443_0090_000002 exited
with  exitCode: -1000
For more detailed output, check application tracking
page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
click on links to logs of each attempt.
Diagnostics: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
java.io.FileNotFoundException: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
	at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
	at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
	at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
	at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1526888270443_0090
	at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
	at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
	at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
	... 5 more/

and my code like :

/public class SubmitDemo {


    private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
    private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
    private static final String JAR_FILE =
"/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";


    public static void main(String[] args) {

        SubmitDemo demo = new SubmitDemo();
        demo.before();
        List<String> parameters = new ArrayList<>();
        parameters.add("run");
        parameters.add("-d");
        parameters.add("-m");
        parameters.add("yarn-cluster");
        parameters.add("-ynm");
        parameters.add("lz_test_alone");
        parameters.add("-yn");
        parameters.add("4");
        parameters.add("-ytm");
        parameters.add("4096");
        parameters.add("-yjm");
        parameters.add("1024");
        parameters.add("-c");
        parameters.add("flink.Demo");
        parameters.add(JAR_FILE);

        try {
            demo.submit(parameters.toArray(new String[parameters.size()]));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void submit(String[] args) throws Exception {

        final String configurationDirectory = ENV_CONF;

        File configFIle = new File(FLINK_CONF);

        final Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());

        FlinkYarnSessionCli cli = new
FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
"yarn");

        final List<CustomCommandLine&lt;?>> customCommandLines =
CliFrontend.loadCustomCommandLines(
                flinkConfiguration,
                configurationDirectory);

        CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
customCommandLines);
        //submit
        testFrontend.parseParameters(args);
        CommandLine commandLine = CliFrontendParser.parse(
                CliFrontendParser.getRunCommandOptions(),
                args,
                true);
        final ApplicationId clusterId = cli.getClusterId(commandLine);
        System.out.println("ApplicationId=" + clusterId.toString());
    }

    // SET HADOOP ENV 
    private void before() {
        Map<String, String> newenv = Maps.newHashMap();
        newenv.put("HADOOP_CONF_DIR", ENV_CONF);
        newenv.put("YARN_CONF_DIR", ENV_CONF);
           try {
            Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
            Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
            theEnvironmentField.setAccessible(true);
            Map<String, String> env = (Map<String, String>)
theEnvironmentField.get(null);
            env.putAll(newenv);
            Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
            theCaseInsensitiveEnvironmentField.setAccessible(true);
            Map<String, String> cienv = (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
            cienv.putAll(newenv);
      } catch (NoSuchFieldException e) {
            Class[] classes = Collections.class.getDeclaredClasses();
            Map<String, String> env = System.getenv();
            for (Class cl : classes) {
                if
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
                    Field field = cl.getDeclaredField("m");
                    field.setAccessible(true);
                    Object obj = field.get(env);
                    Map<String, String> map = (Map<String, String>) obj;
                    map.clear();
                    map.putAll(newenv);
                }
            }
        }
    }


}/


the error  is file not found
"/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
"
but I can foud this file .
Previously, I thought it was an environment variable problem and added "
before() ". This method still reported an error




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to submit flink job on yarn by java code

Posted by Rong Rong <wa...@gmail.com>.
I dont think your exception / code was attached.

In general, this is largely depending on how your setup is. Are you trying
to setup a long-running YARN session cluster or are you trying to directly
use YARN cluster submit? [1].
We have an open-sourced project [2] with similar requirement submitting
compiled jobs on YARN, where we directly extends Flink's
AbstractYarnClusterDescriptor.
maybe it can be a reference for you.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#start-a-long-running-flink-cluster-on-yarn
[2]
https://github.com/uber/AthenaX/blob/master/athenax-backend/src/main/java/com/uber/athenax/backend/server/yarn/AthenaXYarnClusterDescriptor.java

On Tue, Aug 14, 2018 at 7:58 PM spoon_lz <97...@qq.com> wrote:

> My project is to automatically generate flink's code jar and then submit it
> to yarn cluster for execution and get the ApplicationId. I find that after
> execution, an error will be reported
>
>
>
> Then I searched for the error on Google and found that the reason for the
> error was that I did not introduce the haoop environment variable.
> But my jar submission is not called./bin/flink script originally submitted,
> but use the CliFrontend.java ,How to solve this problem?
>
> My code like :
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>