You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Antonio Martínez Carratalá <am...@alto-analytics.com> on 2020/03/31 15:01:21 UTC

Flink in EMR configuration problem

Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code
but I'm having some problems

This is how I create the cluster:
------------------------------------------------------------------------------------------------------------
StepConfig copyJarStep = new StepConfig()
    .withName("copy-jar-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
"/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));

List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
    .withClassification("flink-conf")
    .addPropertiesEntry("jobmanager.heap.size", "6g")
    .addPropertiesEntry("taskmanager.heap.size", "6g")
    .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");

RunJobFlowRequest request = new RunJobFlowRequest()
    .withName("cluster-" + executionKey)
    .withReleaseLabel("emr-5.26.0")
    .withApplications(flink)
    .withConfigurations(flinkConfiguration)
    .withServiceRole("EMR_DefaultRole")
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withLogUri(getWorkPath() + "logs")
    .withInstances(new JobFlowInstancesConfig()
        .withEc2SubnetId("mysubnetid")
        .withInstanceCount(2)
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m4.large")
        .withSlaveInstanceType("m4.large"))
    .withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
---------------------------------------------------------------------------------------------------------

And this is how I add the jobwhen the cluster is ready:
------------------------------------------------------------------------------------------
StepConfig runJobStep = new StepConfig()
    .withName("run-job-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2
--class es.trendit.flink.job.centrality.CentralityJob
/home/hadoop/trendit-flink-jobs.jar <args...>"));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
.withJobFlowId(clusterId)
.withSteps(runJobStep);

AddJobFlowStepsResult result =
amazonClient.getEmrClient().addJobFlowSteps(request);
-----------------------------------------------------------------------------------------------

As summary:
- I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
- jobmanager.heap.size and taskmanager.heap.size: 6g
- taskmanager.numberOfTaskSlots: 2
- run flink with --parallelism 2
- so 1 EMR instance should be running the jobmanager and the other the
taskmanager with 2 slots available

But it fails after some time and I see this warning in the step stdout file:
----------------------------------------------------------------------------------------------------------------------
2020-03-31 14:37:47,288 WARN
 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN
session requires 12288MB of memory in the cluster. There are currently only
6144MB available.
The Flink YARN client will try to allocate the YARN session, but maybe not
all TaskManagers are connecting from the beginning because the resources
are currently not available in the cluster. The allocation might take more
time than usual because the Flink YARN client needs to wait until the
resources become available.
2020-03-31 14:37:47,294 WARN
 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is
not enough memory available in the YARN cluster. The TaskManager(s) require
6144MB each. NodeManagers available: [6144]
After allocating the JobManager (6144MB) and (0/1) TaskManagers, the
following NodeManagers are available: [0]
The Flink YARN client will try to allocate the YARN session, but maybe not
all TaskManagers are connecting from the beginning because the resources
are currently not available in the cluster. The allocation might take more
time than usual because the Flink YARN client needs to wait until the
resources become available.
2020-03-31 14:37:47,296 INFO
 org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster
specification: ClusterSpecification{masterMemoryMB=6144,
taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
----------------------------------------------------------------------------------------------------------------------


And this error in the step stderr file:
----------------------------------------------------------------------------------------------------------------------

org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
...
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 23 more
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate enough slots within timeout of 300000 ms to run the job.
Please make sure that the cluster has enough resources.
at
org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
...
----------------------------------------------------------------------------------------------------------------------


It looks to me like the TaskManager is not created at the beginning, any
idea why is this happening and how to solve it? I could not find any
relevant information in Flink docs

Thanks

Re: Flink in EMR configuration problem

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Sorry I missed that. But yes, it looks like you are running two JobManagers :) You can always check the yarn logs for more information what is being executed.

Piotrek

> On 1 Apr 2020, at 16:44, Antonio Martínez Carratalá <am...@alto-analytics.com> wrote:
> 
> Hi Piotr,
> 
> I don't have 2 task managers, just one with 2 slots. That would be ok according to my calculations, but as Craig said I need one more instance for the cluster master. I was guessing the job manager was running in the master and the task manager in the slave, but both job manager and task manager run on slaves so I need 3 instances instead of 2 as I guessed.
> 
> Regards
> 
> On Wed, Apr 1, 2020 at 1:31 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
> Hey,
> 
> Isn’t explanation of the problem in the logs that you posted? Not enough memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2 TaskManagers AND 1 JobManager with 6GB heap size each?
> 
> Piotrek
> 
> > On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá <amartinez@alto-analytics.com <ma...@alto-analytics.com>> wrote:
> > 
> > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code but I'm having some problems
> > 
> > This is how I create the cluster:
> > ------------------------------------------------------------------------------------------------------------
> > StepConfig copyJarStep = new StepConfig()
> >     .withName("copy-jar-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));
> > 
> > List<StepConfig> stepConfigs = new ArrayList<>();
> > stepConfigs.add(copyJarStep);
> > 
> > Application flink = new Application().withName("Flink");
> > 
> > Configuration flinkConfiguration = new Configuration()
> >     .withClassification("flink-conf")
> >     .addPropertiesEntry("jobmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");
> > 
> > RunJobFlowRequest request = new RunJobFlowRequest()
> >     .withName("cluster-" + executionKey)
> >     .withReleaseLabel("emr-5.26.0")
> >     .withApplications(flink)
> >     .withConfigurations(flinkConfiguration)
> >     .withServiceRole("EMR_DefaultRole")
> >     .withJobFlowRole("EMR_EC2_DefaultRole")
> >     .withLogUri(getWorkPath() + "logs")
> >     .withInstances(new JobFlowInstancesConfig()
> >         .withEc2SubnetId("mysubnetid")
> >         .withInstanceCount(2)
> >         .withKeepJobFlowAliveWhenNoSteps(true)
> >         .withMasterInstanceType("m4.large")
> >         .withSlaveInstanceType("m4.large"))
> >     .withSteps(stepConfigs);
> > 
> > RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
> > ---------------------------------------------------------------------------------------------------------
> > 
> > And this is how I add the jobwhen the cluster is ready:
> > ------------------------------------------------------------------------------------------
> > StepConfig runJobStep = new StepConfig()
> >     .withName("run-job-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 --class es.trendit.flink.job.centrality.CentralityJob /home/hadoop/trendit-flink-jobs.jar <args...>"));
> > 
> > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> > .withJobFlowId(clusterId)
> > .withSteps(runJobStep);
> > 
> > AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request);
> > -----------------------------------------------------------------------------------------------
> > 
> > As summary:
> > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
> > - jobmanager.heap.size and taskmanager.heap.size: 6g
> > - taskmanager.numberOfTaskSlots: 2
> > - run flink with --parallelism 2
> > - so 1 EMR instance should be running the jobmanager and the other the taskmanager with 2 slots available
> > 
> > But it fails after some time and I see this warning in the step stdout file:
> > ----------------------------------------------------------------------------------------------------------------------
> > 2020-03-31 14:37:47,288 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN session requires 12288MB of memory in the cluster. There are currently only 6144MB available.
> > The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
> > 2020-03-31 14:37:47,294 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is not enough memory available in the YARN cluster. The TaskManager(s) require 6144MB each. NodeManagers available: [6144]
> > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the following NodeManagers are available: [0]
> > The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
> > 2020-03-31 14:37:47,296 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=6144, taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
> > ---------------------------------------------------------------------------------------------------------------------- 
> > 
> > And this error in the step stderr file:
> > ---------------------------------------------------------------------------------------------------------------------- 
> > org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
> > at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> > at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> > at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> > ... 23 more
> > Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots within timeout of 300000 ms to run the job. Please make sure that the cluster has enough resources.
> > at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
> > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> > ...
> > ----------------------------------------------------------------------------------------------------------------------
> > 
> > It looks to me like the TaskManager is not created at the beginning, any idea why is this happening and how to solve it? I could not find any relevant information in Flink docs
> > 
> > Thanks
> > 
> > 
> 
> 
> 


Re: Flink in EMR configuration problem

Posted by Antonio Martínez Carratalá <am...@alto-analytics.com>.
Hi Piotr,

I don't have 2 task managers, just one with 2 slots. That would be ok
according to my calculations, but as Craig said I need one more instance
for the cluster master. I was guessing the job manager was running in the
master and the task manager in the slave, but both job manager and task
manager run on slaves so I need 3 instances instead of 2 as I guessed.

Regards

On Wed, Apr 1, 2020 at 1:31 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hey,
>
> Isn’t explanation of the problem in the logs that you posted? Not enough
> memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2
> TaskManagers AND 1 JobManager with 6GB heap size each?
>
> Piotrek
>
> > On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá <
> amartinez@alto-analytics.com> wrote:
> >
> > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java
> code but I'm having some problems
> >
> > This is how I create the cluster:
> >
> ------------------------------------------------------------------------------------------------------------
> > StepConfig copyJarStep = new StepConfig()
> >     .withName("copy-jar-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
> "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));
> >
> > List<StepConfig> stepConfigs = new ArrayList<>();
> > stepConfigs.add(copyJarStep);
> >
> > Application flink = new Application().withName("Flink");
> >
> > Configuration flinkConfiguration = new Configuration()
> >     .withClassification("flink-conf")
> >     .addPropertiesEntry("jobmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.heap.size", "6g")
> >     .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");
> >
> > RunJobFlowRequest request = new RunJobFlowRequest()
> >     .withName("cluster-" + executionKey)
> >     .withReleaseLabel("emr-5.26.0")
> >     .withApplications(flink)
> >     .withConfigurations(flinkConfiguration)
> >     .withServiceRole("EMR_DefaultRole")
> >     .withJobFlowRole("EMR_EC2_DefaultRole")
> >     .withLogUri(getWorkPath() + "logs")
> >     .withInstances(new JobFlowInstancesConfig()
> >         .withEc2SubnetId("mysubnetid")
> >         .withInstanceCount(2)
> >         .withKeepJobFlowAliveWhenNoSteps(true)
> >         .withMasterInstanceType("m4.large")
> >         .withSlaveInstanceType("m4.large"))
> >     .withSteps(stepConfigs);
> >
> > RunJobFlowResult result =
> amazonClient.getEmrClient().runJobFlow(request);
> >
> ---------------------------------------------------------------------------------------------------------
> >
> > And this is how I add the jobwhen the cluster is ready:
> >
> ------------------------------------------------------------------------------------------
> > StepConfig runJobStep = new StepConfig()
> >     .withName("run-job-step")
> >     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
> >     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
> >         .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism
> 2 --class es.trendit.flink.job.centrality.CentralityJob
> /home/hadoop/trendit-flink-jobs.jar <args...>"));
> >
> > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> > .withJobFlowId(clusterId)
> > .withSteps(runJobStep);
> >
> > AddJobFlowStepsResult result =
> amazonClient.getEmrClient().addJobFlowSteps(request);
> >
> -----------------------------------------------------------------------------------------------
> >
> > As summary:
> > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
> > - jobmanager.heap.size and taskmanager.heap.size: 6g
> > - taskmanager.numberOfTaskSlots: 2
> > - run flink with --parallelism 2
> > - so 1 EMR instance should be running the jobmanager and the other the
> taskmanager with 2 slots available
> >
> > But it fails after some time and I see this warning in the step stdout
> file:
> >
> ----------------------------------------------------------------------------------------------------------------------
> > 2020-03-31 14:37:47,288 WARN
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN
> session requires 12288MB of memory in the cluster. There are currently only
> 6144MB available.
> > The Flink YARN client will try to allocate the YARN session, but maybe
> not all TaskManagers are connecting from the beginning because the
> resources are currently not available in the cluster. The allocation might
> take more time than usual because the Flink YARN client needs to wait until
> the resources become available.
> > 2020-03-31 14:37:47,294 WARN
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is
> not enough memory available in the YARN cluster. The TaskManager(s) require
> 6144MB each. NodeManagers available: [6144]
> > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the
> following NodeManagers are available: [0]
> > The Flink YARN client will try to allocate the YARN session, but maybe
> not all TaskManagers are connecting from the beginning because the
> resources are currently not available in the cluster. The allocation might
> take more time than usual because the Flink YARN client needs to wait until
> the resources become available.
> > 2020-03-31 14:37:47,296 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster
> specification: ClusterSpecification{masterMemoryMB=6144,
> taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
> >
> ----------------------------------------------------------------------------------------------------------------------
>
> >
> > And this error in the step stderr file:
> >
> ----------------------------------------------------------------------------------------------------------------------
>
> > org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
> > at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> > at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> > at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> > ... 23 more
> > Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate enough slots within timeout of 300000 ms to run the job.
> Please make sure that the cluster has enough resources.
> > at
> org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
> > at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> > ...
> >
> ----------------------------------------------------------------------------------------------------------------------
> >
> > It looks to me like the TaskManager is not created at the beginning, any
> idea why is this happening and how to solve it? I could not find any
> relevant information in Flink docs
> >
> > Thanks
> >
> >
>
>

Re: Flink in EMR configuration problem

Posted by Piotr Nowojski <pi...@ververica.com>.
Hey,

Isn’t explanation of the problem in the logs that you posted? Not enough memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2 TaskManagers AND 1 JobManager with 6GB heap size each?

Piotrek

> On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá <am...@alto-analytics.com> wrote:
> 
> Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code but I'm having some problems
> 
> This is how I create the cluster:
> ------------------------------------------------------------------------------------------------------------
> StepConfig copyJarStep = new StepConfig()
>     .withName("copy-jar-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar"));
> 
> List<StepConfig> stepConfigs = new ArrayList<>();
> stepConfigs.add(copyJarStep);
> 
> Application flink = new Application().withName("Flink");
> 
> Configuration flinkConfiguration = new Configuration()
>     .withClassification("flink-conf")
>     .addPropertiesEntry("jobmanager.heap.size", "6g")
>     .addPropertiesEntry("taskmanager.heap.size", "6g")
>     .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2");
> 
> RunJobFlowRequest request = new RunJobFlowRequest()
>     .withName("cluster-" + executionKey)
>     .withReleaseLabel("emr-5.26.0")
>     .withApplications(flink)
>     .withConfigurations(flinkConfiguration)
>     .withServiceRole("EMR_DefaultRole")
>     .withJobFlowRole("EMR_EC2_DefaultRole")
>     .withLogUri(getWorkPath() + "logs")
>     .withInstances(new JobFlowInstancesConfig()
>         .withEc2SubnetId("mysubnetid")
>         .withInstanceCount(2)
>         .withKeepJobFlowAliveWhenNoSteps(true)
>         .withMasterInstanceType("m4.large")
>         .withSlaveInstanceType("m4.large"))
>     .withSteps(stepConfigs);
> 
> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
> ---------------------------------------------------------------------------------------------------------
> 
> And this is how I add the jobwhen the cluster is ready:
> ------------------------------------------------------------------------------------------
> StepConfig runJobStep = new StepConfig()
>     .withName("run-job-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 --class es.trendit.flink.job.centrality.CentralityJob /home/hadoop/trendit-flink-jobs.jar <args...>"));
> 
> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
> .withJobFlowId(clusterId)
> .withSteps(runJobStep);
> 
> AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request);
> -----------------------------------------------------------------------------------------------
> 
> As summary:
> - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each)
> - jobmanager.heap.size and taskmanager.heap.size: 6g
> - taskmanager.numberOfTaskSlots: 2
> - run flink with --parallelism 2
> - so 1 EMR instance should be running the jobmanager and the other the taskmanager with 2 slots available
> 
> But it fails after some time and I see this warning in the step stdout file:
> ----------------------------------------------------------------------------------------------------------------------
> 2020-03-31 14:37:47,288 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - This YARN session requires 12288MB of memory in the cluster. There are currently only 6144MB available.
> The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
> 2020-03-31 14:37:47,294 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - There is not enough memory available in the YARN cluster. The TaskManager(s) require 6144MB each. NodeManagers available: [6144]
> After allocating the JobManager (6144MB) and (0/1) TaskManagers, the following NodeManagers are available: [0]
> The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.
> 2020-03-31 14:37:47,296 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=6144, taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
> ---------------------------------------------------------------------------------------------------------------------- 
> 
> And this error in the step stderr file:
> ---------------------------------------------------------------------------------------------------------------------- 
> org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f0a651302d5fd48d35ff5b5d0880f99)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> ...
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> ... 23 more
> Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots within timeout of 300000 ms to run the job. Please make sure that the cluster has enough resources.
> at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> ...
> ----------------------------------------------------------------------------------------------------------------------
> 
> It looks to me like the TaskManager is not created at the beginning, any idea why is this happening and how to solve it? I could not find any relevant information in Flink docs
> 
> Thanks
> 
>