You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Hailu, Andreas [Engineering]" <An...@gs.com> on 2021/08/13 19:39:43 UTC

Upgrading from Flink on YARN 1.9 to 1.11

Hello folks!

We're looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN and each have their own clusters, with each application having multiple jobs submitted.

Our current submission command looks like this:
$ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar -application-args-go-here

The behavior observed in versions <= 1.9 is the following:

1.     A Flink cluster gets deployed to YARN

2.     Our application code is run, building graphs and submitting jobs

When we rebuilt and submit using 1.11.2, we now observe the following:

1.     Our application code is run, building graph and submitting jobs

2.     A Flink cluster gets deployed to YARN once execute() is invoked

I presume that this is a result of FLIP-85 [1] ?

This change in behavior proves to be a problem for us as our application is multi-threaded, and each thread submits its own job to the Flink cluster. What we see is the first thread to peexecute() submits a job to YARN, and others fail with a ClusterDeploymentException.

2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
Listening for transport dt_socket at address: 5005
2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor - Adding delegation token to the AM container.
2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to obtain Kerberos security token for HBase
2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor - Submitting application master application_1628393898291_71530
2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception running data flow for flink-thread-#2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
        at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
        at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
        at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
        ...
Caused by: java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
        ...
Caused by: java.lang.ExceptionInInitializerError
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)

Is the only solution here to move to application mode [2]? Doing so would imply a migration requirement (which may have its own set of problems.)

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

RE: Upgrading from Flink on YARN 1.9 to 1.11

Posted by "Hailu, Andreas [Engineering]" <An...@gs.com>.
Hi David, I was able to get this working using your suggestion:


1)    Deploy a Flink YARN Session Cluster, noting the host + port of the session’s Job Manager.

2)    Submit a Flink job using the session’s details, i.e submitting Flink job with ‘-m host:port’ option.

Thanks for clearing things up.

// ah

From: David Morávek <dm...@apache.org>
Sent: Tuesday, August 17, 2021 4:37 AM
To: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Cc: Ravichandran, Soorya Prasanna [Engineering] <So...@ny.email.gs.com>; user@flink.apache.org
Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11

Hi Andreas,

the problem here is that the command you're using is starting a per-job cluster (which is obvious from the used deployment method "YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster` flag is deprecated and no longer supported, I think this is something we should completely remove in the near future. Also this was always supposed to start your job in per-job mode, but unfortunately in older versions this was kind of simulated using session cluster, so I'd say it has just worked by an accident (a.k.a "undocumented bug / feature").

What you really want to do is to start a session cluster upfront and than use a `yarn-session` deployment target (where you need to provide yarn application id so Flink can search for the active JobManager). This is well documented in the yarn section of the docs<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw&s=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80&e=> [1].

Can you please try this approach a let me know if that helped?

[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw&s=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80&e=>

Best,
D.

On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] <An...@gs.com>> wrote:
Hi David,

You’re correct about classpathing problems – thanks for your help in spotting them. I was able to get past that exception by removing some conflicting packages in my shaded JAR, but I’m seeing something else that’s interesting. With the 2 threads trying to submit jobs, one of the threads is able submit and process data successfully, while the other thread fails.

Log snippet:
2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding delegation token to the AM container.
2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp)
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain Kerberos security token for HBase
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job graph to local resource fail.
2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor - Submitting application master application_1628992879699_11275
2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running data flow for thread-2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
            at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
            at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
            at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
            at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
            at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
            at com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
...
Caused by: java.io.IOException: Filesystem closed
            at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
            at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
...
2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted application application_1628992879699_11275
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting for the cluster to be allocated
2021-08-16 13:43:21,379 [thread-1] INFO  YarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2021-08-16 13:43:28,435 [thread-1] INFO  YarnClusterDescriptor - YARN application has been deployed successfully.
2021-08-16 13:43:28,436 [thread-1] INFO  YarnClusterDescriptor - Found Web Interface d279536-023.dc.gs.com:41019<http://d279536-023.dc.gs.com:41019> of application 'application_1628992879699_11275'.
2021-08-16 13:43:28,443 [thread-1] INFO  AbstractJobClusterExecutor - Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO  FlinkJobSubmitter$2 - job completed for thread-2 with parallelism 1
Program execution finished
Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished.

I’ve generated and sent you a signup link to our firm’s secure document-sharing app called Lockbox. In the repository, I’ve uploaded both our full client and YARN app logs (named half_failure-client_log and half_failure-yarn-log, respectively) in a directory named Flink support logs/Flink 1.11/1.11.2_POC. The logs are quite brief – would you be able to have a look at see if you can see if there’s something we’re doing that’s clearly wrong?

Something I did notice is that with the upgrade, our submissions are now using the introduction of this ContextEnvironment#executeAsync method. If it means anything, our client doesn’t require asynchronous job submission.
// ah

From: David Morávek <dm...@apache.org>>
Sent: Monday, August 16, 2021 6:28 AM
To: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11

Hi Andreas,

Per-job and session deployment modes should not be affected by this FLIP. Application mode is just a new deployment mode (where job driver runs embedded within JM), that co-exists with these two.

From information you've provided, I'd say your actual problem is this exception:

```
Caused by: java.lang.ExceptionInInitializerError
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
```

I've seen this exception a few times with Hadoop already and it's usually a dependency / class-path problem. If you google for this you'll find many references.

Best,
D.


On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <An...@gs.com>> wrote:
Hello folks!

We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN and each have their own clusters, with each application having multiple jobs submitted.

Our current submission command looks like this:
$ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar -application-args-go-here

The behavior observed in versions <= 1.9 is the following:

1.     A Flink cluster gets deployed to YARN

2.     Our application code is run, building graphs and submitting jobs

When we rebuilt and submit using 1.11.2, we now observe the following:

1.     Our application code is run, building graph and submitting jobs

2.     A Flink cluster gets deployed to YARN once execute() is invoked

I presume that this is a result of FLIP-85 [1] ?

This change in behavior proves to be a problem for us as our application is multi-threaded, and each thread submits its own job to the Flink cluster. What we see is the first thread to peexecute() submits a job to YARN, and others fail with a ClusterDeploymentException.

2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
Listening for transport dt_socket at address: 5005
2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor - Adding delegation token to the AM container.
2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to obtain Kerberos security token for HBase
2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor - Submitting application master application_1628393898291_71530
2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception running data flow for flink-thread-#2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
        at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
        at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
        at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
        ...
Caused by: java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
        ...
Caused by: java.lang.ExceptionInInitializerError
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)

Is the only solution here to move to application mode [2]? Doing so would imply a migration requirement (which may have its own set of problems.)

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D85-2BFlink-2BApplication-2BMode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=jNWOsyLnWqYqe1rrtYoAAvkFMqIMdw2hdO1oeAj58DM&e=>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_deployment_yarn-5Fsetup.html-23run-2Dan-2Dapplication-2Din-2Dapplication-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=TYm-DbNnyhKJ8xvjIZ1rhYJ8LjO86DYVa653ZlIuA2M&e=>

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: Upgrading from Flink on YARN 1.9 to 1.11

Posted by David Morávek <dm...@apache.org>.
Hi Andreas,

the problem here is that the command you're using is starting a per-job
cluster (which is obvious from the used deployment method "
YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster`
flag is deprecated and no longer supported, I think this is something we
should completely remove in the near future. Also this was always supposed
to start your job in per-job mode, but unfortunately in older versions this
was kind of simulated using session cluster, so I'd say it has just worked
by an accident (a.k.a "undocumented bug / feature").

What you really want to do is to start a session cluster upfront and than
use a `yarn-session` deployment target (where you need to provide yarn
application id so Flink can search for the active JobManager). This is well
documented in the yarn section of the docs
<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode>
[1].

Can you please try this approach a let me know if that helped?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode

Best,
D.

On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] <
Andreas.Hailu@gs.com> wrote:

> Hi David,
>
>
>
> You’re correct about classpathing problems – thanks for your help in
> spotting them. I was able to get past that exception by removing some
> conflicting packages in my shaded JAR, but I’m seeing something else that’s
> interesting. With the 2 threads trying to submit jobs, one of the threads
> is able submit and process data successfully, while the other thread fails.
>
>
>
> Log snippet:
>
> 2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding
> delegation token to the AM container.
>
> 2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
>
> 2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp)
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain
> Kerberos security token for HBase
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available
> (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job
> graph to local resource fail.
>
> 2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628992879699_11275
>
> 2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running
> data flow for thread-2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
>             at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
>             at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
>
>             at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
>
>             at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
>
>             at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
>
>             at
> com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
>
> ...
>
> Caused by: java.io.IOException: Filesystem closed
>
>             at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
>
>             at
> org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
>
> ...
>
> 2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline
> service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted
> application application_1628992879699_11275
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting
> for the cluster to be allocated
>
> 2021-08-16 13:43:21,379 [thread-1] INFO  YarnClusterDescriptor - Deploying
> cluster, current state ACCEPTED
>
> 2021-08-16 13:43:28,435 [thread-1] INFO  YarnClusterDescriptor - YARN
> application has been deployed successfully.
>
> 2021-08-16 13:43:28,436 [thread-1] INFO  YarnClusterDescriptor - Found Web
> Interface d279536-023.dc.gs.com:41019 of application
> 'application_1628992879699_11275'.
>
> 2021-08-16 13:43:28,443 [thread-1] INFO  AbstractJobClusterExecutor - Job
> has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
>
> Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
>
> 2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO  FlinkJobSubmitter$2
> - job completed for thread-2 with parallelism 1
>
> Program execution finished
>
> Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished.
>
>
>
> I’ve generated and sent you a signup link to our firm’s secure
> document-sharing app called Lockbox. In the repository, I’ve uploaded both
> our full client and YARN app logs (named half_failure-client_log and
> half_failure-yarn-log, respectively) in a directory named Flink support
> logs/Flink 1.11/1.11.2_POC. The logs are quite brief – would you be able to
> have a look at see if you can see if there’s something we’re doing that’s
> clearly wrong?
>
>
>
> Something I did notice is that with the upgrade, our submissions are now
> using the introduction of this ContextEnvironment#executeAsync method. If
> it means anything, our client doesn’t require asynchronous job submission.
>
> *// *ah
>
>
>
> *From:* David Morávek <dm...@apache.org>
> *Sent:* Monday, August 16, 2021 6:28 AM
> *To:* Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Upgrading from Flink on YARN 1.9 to 1.11
>
>
>
> Hi Andreas,
>
>
>
> Per-job and session deployment modes should not be affected by this FLIP.
> Application mode is just a new deployment mode (where job driver runs
> embedded within JM), that co-exists with these two.
>
>
>
> From information you've provided, I'd say your actual problem is this
> exception:
>
>
>
> ```
>
> Caused by: java.lang.ExceptionInInitializerError
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>
>         at com.sun.jersey.api.client.Client.init(Client.java:342)
>
>         at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>
>         at
> com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>
>         at com.sun.jersey.api.client.Client.<init>(Client.java:187)
>
>        at com.sun.jersey.api.client.Client.<init>(Client.java:170)
>
>         at
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
> ```
>
>
>
> I've seen this exception a few times with Hadoop already and it's usually
> a dependency / class-path problem. If you google for this you'll find many
> references.
>
>
>
> Best,
>
> D.
>
>
>
>
>
> On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <
> Andreas.Hailu@gs.com> wrote:
>
> Hello folks!
>
>
>
> We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on
> YARN and each have their own clusters, with each application having
> multiple jobs submitted.
>
>
>
> Our current submission command looks like this:
>
> $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name
> -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar
> -application-args-go-here
>
>
>
> The behavior observed in versions <= 1.9 is the following:
>
> 1.     A Flink cluster gets deployed to YARN
>
> 2.     Our application code is run, building graphs and submitting jobs
>
>
>
> When we rebuilt and submit using 1.11.2, we now observe the following:
>
> 1.     Our application code is run, building graph and submitting jobs
>
> 2.     A Flink cluster gets deployed to YARN once execute() is invoked
>
>
>
> I presume that this is a result of FLIP-85 [1] ?
>
>
>
> This change in behavior proves to be a problem for us as our application
> is multi-threaded, and each thread submits its own job to the Flink
> cluster. What we see is the first thread to peexecute() submits a job to
> YARN, and others fail with a ClusterDeploymentException.
>
>
>
> 2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> Listening for transport dt_socket at address: 5005
>
> 2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Adding delegation token to the AM container.
>
> 2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to
> obtain Kerberos security token for HBase
>
> 2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not
> available (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628393898291_71530
>
> 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception
> running data flow for flink-thread-#2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
>         at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
>         at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
>
>         at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
>
>         at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
>
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
>
>         ...
>
> Caused by: java.io.IOException: Filesystem closed
>
>         at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
>
>         at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
>
>         at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
>
>         ...
>
> Caused by: java.lang.ExceptionInInitializerError
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>
>         at com.sun.jersey.api.client.Client.init(Client.java:342)
>
>         at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>
>         at
> com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>
>         at com.sun.jersey.api.client.Client.<init>(Client.java:187)
>
>        at com.sun.jersey.api.client.Client.<init>(Client.java:170)
>
>         at
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
>
>
>
> Is the only solution here to move to application mode [2]? Doing so would
> imply a migration requirement (which may have its own set of problems.)
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D85-2BFlink-2BApplication-2BMode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=jNWOsyLnWqYqe1rrtYoAAvkFMqIMdw2hdO1oeAj58DM&e=>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_deployment_yarn-5Fsetup.html-23run-2Dan-2Dapplication-2Din-2Dapplication-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=TYm-DbNnyhKJ8xvjIZ1rhYJ8LjO86DYVa653ZlIuA2M&e=>
>
>
>
> Best,
>
> Andreas
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

RE: Upgrading from Flink on YARN 1.9 to 1.11

Posted by "Hailu, Andreas [Engineering]" <An...@gs.com>.
Hi David,

You’re correct about classpathing problems – thanks for your help in spotting them. I was able to get past that exception by removing some conflicting packages in my shaded JAR, but I’m seeing something else that’s interesting. With the 2 threads trying to submit jobs, one of the threads is able submit and process data successfully, while the other thread fails.

Log snippet:
2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding delegation token to the AM container.
2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp)
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain Kerberos security token for HBase
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job graph to local resource fail.
2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor - Submitting application master application_1628992879699_11275
2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running data flow for thread-2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
            at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
            at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
            at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
            at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
            at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
            at com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
...
Caused by: java.io.IOException: Filesystem closed
            at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
            at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
...
2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted application application_1628992879699_11275
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting for the cluster to be allocated
2021-08-16 13:43:21,379 [thread-1] INFO  YarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2021-08-16 13:43:28,435 [thread-1] INFO  YarnClusterDescriptor - YARN application has been deployed successfully.
2021-08-16 13:43:28,436 [thread-1] INFO  YarnClusterDescriptor - Found Web Interface d279536-023.dc.gs.com:41019 of application 'application_1628992879699_11275'.
2021-08-16 13:43:28,443 [thread-1] INFO  AbstractJobClusterExecutor - Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO  FlinkJobSubmitter$2 - job completed for thread-2 with parallelism 1
Program execution finished
Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished.

I’ve generated and sent you a signup link to our firm’s secure document-sharing app called Lockbox. In the repository, I’ve uploaded both our full client and YARN app logs (named half_failure-client_log and half_failure-yarn-log, respectively) in a directory named Flink support logs/Flink 1.11/1.11.2_POC. The logs are quite brief – would you be able to have a look at see if you can see if there’s something we’re doing that’s clearly wrong?

Something I did notice is that with the upgrade, our submissions are now using the introduction of this ContextEnvironment#executeAsync method. If it means anything, our client doesn’t require asynchronous job submission.

// ah

From: David Morávek <dm...@apache.org>
Sent: Monday, August 16, 2021 6:28 AM
To: Hailu, Andreas [Engineering] <An...@ny.email.gs.com>
Cc: user@flink.apache.org
Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11

Hi Andreas,

Per-job and session deployment modes should not be affected by this FLIP. Application mode is just a new deployment mode (where job driver runs embedded within JM), that co-exists with these two.

From information you've provided, I'd say your actual problem is this exception:

```
Caused by: java.lang.ExceptionInInitializerError
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
```

I've seen this exception a few times with Hadoop already and it's usually a dependency / class-path problem. If you google for this you'll find many references.

Best,
D.


On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <An...@gs.com>> wrote:
Hello folks!

We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN and each have their own clusters, with each application having multiple jobs submitted.

Our current submission command looks like this:
$ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar -application-args-go-here

The behavior observed in versions <= 1.9 is the following:

1.     A Flink cluster gets deployed to YARN

2.     Our application code is run, building graphs and submitting jobs

When we rebuilt and submit using 1.11.2, we now observe the following:

1.     Our application code is run, building graph and submitting jobs

2.     A Flink cluster gets deployed to YARN once execute() is invoked

I presume that this is a result of FLIP-85 [1] ?

This change in behavior proves to be a problem for us as our application is multi-threaded, and each thread submits its own job to the Flink cluster. What we see is the first thread to peexecute() submits a job to YARN, and others fail with a ClusterDeploymentException.

2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
Listening for transport dt_socket at address: 5005
2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins directory [plugins] does not exist.
2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor - Adding delegation token to the AM container.
2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to obtain Kerberos security token for HBase
2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor - Submitting application master application_1628393898291_71530
2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception running data flow for flink-thread-#2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
        at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
        at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
        at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
        ...
Caused by: java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
        ...
Caused by: java.lang.ExceptionInInitializerError
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)

Is the only solution here to move to application mode [2]? Doing so would imply a migration requirement (which may have its own set of problems.)

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D85-2BFlink-2BApplication-2BMode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=jNWOsyLnWqYqe1rrtYoAAvkFMqIMdw2hdO1oeAj58DM&e=>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_deployment_yarn-5Fsetup.html-23run-2Dan-2Dapplication-2Din-2Dapplication-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=TYm-DbNnyhKJ8xvjIZ1rhYJ8LjO86DYVa653ZlIuA2M&e=>

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re: Upgrading from Flink on YARN 1.9 to 1.11

Posted by David Morávek <dm...@apache.org>.
Hi Andreas,

Per-job and session deployment modes should not be affected by this FLIP.
Application mode is just a new deployment mode (where job driver runs
embedded within JM), that co-exists with these two.

From information you've provided, I'd say your actual problem is this
exception:

```

Caused by: java.lang.ExceptionInInitializerError

        at
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)

        at
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)

        at
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)

        at com.sun.jersey.api.client.Client.init(Client.java:342)

        at com.sun.jersey.api.client.Client.access$000(Client.java:118)

        at com.sun.jersey.api.client.Client$1.f(Client.java:191)

        at com.sun.jersey.api.client.Client$1.f(Client.java:187)

        at
com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)

        at com.sun.jersey.api.client.Client.<init>(Client.java:187)

       at com.sun.jersey.api.client.Client.<init>(Client.java:170)

        at
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
```


I've seen this exception a few times with Hadoop already and it's usually a
dependency / class-path problem. If you google for this you'll find many
references.


Best,

D.



On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <
Andreas.Hailu@gs.com> wrote:

> Hello folks!
>
>
>
> We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on
> YARN and each have their own clusters, with each application having
> multiple jobs submitted.
>
>
>
> Our current submission command looks like this:
>
> $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name
> -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar
> -application-args-go-here
>
>
>
> The behavior observed in versions <= 1.9 is the following:
>
> 1.     A Flink cluster gets deployed to YARN
>
> 2.     Our application code is run, building graphs and submitting jobs
>
>
>
> When we rebuilt and submit using 1.11.2, we now observe the following:
>
> 1.     Our application code is run, building graph and submitting jobs
>
> 2.     A Flink cluster gets deployed to YARN once execute() is invoked
>
>
>
> I presume that this is a result of FLIP-85 [1] ?
>
>
>
> This change in behavior proves to be a problem for us as our application
> is multi-threaded, and each thread submits its own job to the Flink
> cluster. What we see is the first thread to peexecute() submits a job to
> YARN, and others fail with a ClusterDeploymentException.
>
>
>
> 2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor -
> Cluster specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> Listening for transport dt_socket at address: 5005
>
> 2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Adding delegation token to the AM container.
>
> 2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user)
>
> 2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to
> obtain Kerberos security token for HBase
>
> 2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not
> available (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628393898291_71530
>
> 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception
> running data flow for flink-thread-#2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
>         at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
>         at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
>
>         at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
>
>         at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
>
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
>
>         ...
>
> Caused by: java.io.IOException: Filesystem closed
>
>         at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
>
>         at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
>
>         at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
>
>         ...
>
> Caused by: java.lang.ExceptionInInitializerError
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
>
>         at
> com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
>
>         at com.sun.jersey.api.client.Client.init(Client.java:342)
>
>         at com.sun.jersey.api.client.Client.access$000(Client.java:118)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:191)
>
>         at com.sun.jersey.api.client.Client$1.f(Client.java:187)
>
>         at
> com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
>
>         at com.sun.jersey.api.client.Client.<init>(Client.java:187)
>
>        at com.sun.jersey.api.client.Client.<init>(Client.java:170)
>
>         at
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
>
>
>
> Is the only solution here to move to application mode [2]? Doing so would
> imply a migration requirement (which may have its own set of problems.)
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode
>
>
>
> Best,
>
> Andreas
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>