You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Antonio Martínez Carratalá <am...@alto-analytics.com> on 2020/03/30 11:22:30 UTC

Run several jobs in parallel in same EMR cluster?

Hello

I'm running Flink over Amazon EMR and I'm trying to send several different
batch jobs to the cluster after creating it.

This is my cluster creation code:
----------------------------------------------------------------
StepConfig copyJarStep = new StepConfig()
    .withName("copy-jar-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
"/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));

List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
     .withClassification("flink-conf")
    .addPropertiesEntry("jobmanager.heap.size", "2048m")
    .addPropertiesEntry("taskmanager.heap.size",  "2048m")

RunJobFlowRequest request = new RunJobFlowRequest()
    .withName("cluster-" + executionKey)
    .withReleaseLabel("emr-5.26.0")
    .withApplications(flink)
    .withConfigurations(flinkConfiguration)
    .withServiceRole("EMR_DefaultRole")
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withLogUri(getWorkPath() + "logs")
    .withInstances(new JobFlowInstancesConfig()
        .withEc2SubnetId("subnetid")
        .withInstanceCount(2) // 1 for task manager + 1 for job manager
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m4.large")
        .withSlaveInstanceType("m4.large"))
    .withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
----------------------------------------------------------------------------------------------------------------

And this is how I add the jobs:
---------------------------------------------------------------------------------
StepConfig runJobStep = new StepConfig()
    .withName("run-job-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
    .withArgs("bash", "-c", "flink run -m yarn-cluster"
        + " --parallelism " + parallelism
        + " --class " + jobClass.getCanonicalName()
        + " /home/hadoop/flink-jobs.jar "
        + jobArguments));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
    .withJobFlowId(clusterId)
    .withSteps(runJobStep);

AddJobFlowStepsResult result =
amazonClient.getEmrClient().addJobFlowSteps(request);
---------------------------------------------------------------------------------

And these are my jobs:

- Job1 - parallelism 1
- Job2 - parallelism 1
- Job3 - parallelism 2

I'm using m4.large machines as slave so I have 2 cores in it, and I was
expecting that Job1 and Job2 were running in parallel and then Job3 when
Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
status) for Job1 to finish before start. I see only one task manager is
created for Job1, when finishes another one is created for Job2, and then 2
are created for Job3

Since I have 2 cores available why is it not running Job2 in the other
instead of wait? is there any way to configure it?

Thanks

Re: Run several jobs in parallel in same EMR cluster?

Posted by Antonio Martínez Carratalá <am...@alto-analytics.com>.
I could not make it work as I wanted with taskmanager.numberOfTaskSlots to
2, but I found a way for running them in parallel, just creating a cluster
for each job since they are independent

Thanks

On Mon, Mar 30, 2020 at 4:22 PM Gary Yao <ga...@apache.org> wrote:

> Can you try to set config option taskmanager.numberOfTaskSlots to 2? By
> default the TMs only offer one slot [1] independent from the number of CPU
> cores.
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199
>
> On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <
> amartinez@alto-analytics.com> wrote:
>
>> Hello
>>
>> I'm running Flink over Amazon EMR and I'm trying to send several
>> different batch jobs to the cluster after creating it.
>>
>> This is my cluster creation code:
>> ----------------------------------------------------------------
>> StepConfig copyJarStep = new StepConfig()
>>     .withName("copy-jar-step")
>>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>>         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
>> "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));
>>
>> List<StepConfig> stepConfigs = new ArrayList<>();
>> stepConfigs.add(copyJarStep);
>>
>> Application flink = new Application().withName("Flink");
>>
>> Configuration flinkConfiguration = new Configuration()
>>      .withClassification("flink-conf")
>>     .addPropertiesEntry("jobmanager.heap.size", "2048m")
>>     .addPropertiesEntry("taskmanager.heap.size",  "2048m")
>>
>> RunJobFlowRequest request = new RunJobFlowRequest()
>>     .withName("cluster-" + executionKey)
>>     .withReleaseLabel("emr-5.26.0")
>>     .withApplications(flink)
>>     .withConfigurations(flinkConfiguration)
>>     .withServiceRole("EMR_DefaultRole")
>>     .withJobFlowRole("EMR_EC2_DefaultRole")
>>     .withLogUri(getWorkPath() + "logs")
>>     .withInstances(new JobFlowInstancesConfig()
>>         .withEc2SubnetId("subnetid")
>>         .withInstanceCount(2) // 1 for task manager + 1 for job manager
>>         .withKeepJobFlowAliveWhenNoSteps(true)
>>         .withMasterInstanceType("m4.large")
>>         .withSlaveInstanceType("m4.large"))
>>     .withSteps(stepConfigs);
>>
>> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
>>
>> ----------------------------------------------------------------------------------------------------------------
>>
>> And this is how I add the jobs:
>>
>> ---------------------------------------------------------------------------------
>> StepConfig runJobStep = new StepConfig()
>>     .withName("run-job-step")
>>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>>     .withArgs("bash", "-c", "flink run -m yarn-cluster"
>>         + " --parallelism " + parallelism
>>         + " --class " + jobClass.getCanonicalName()
>>         + " /home/hadoop/flink-jobs.jar "
>>         + jobArguments));
>>
>> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
>>     .withJobFlowId(clusterId)
>>     .withSteps(runJobStep);
>>
>> AddJobFlowStepsResult result =
>> amazonClient.getEmrClient().addJobFlowSteps(request);
>>
>> ---------------------------------------------------------------------------------
>>
>> And these are my jobs:
>>
>> - Job1 - parallelism 1
>> - Job2 - parallelism 1
>> - Job3 - parallelism 2
>>
>> I'm using m4.large machines as slave so I have 2 cores in it, and I was
>> expecting that Job1 and Job2 were running in parallel and then Job3 when
>> Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
>> status) for Job1 to finish before start. I see only one task manager is
>> created for Job1, when finishes another one is created for Job2, and then 2
>> are created for Job3
>>
>> Since I have 2 cores available why is it not running Job2 in the other
>> instead of wait? is there any way to configure it?
>>
>> Thanks
>>
>>
>>
>>

Re: Run several jobs in parallel in same EMR cluster?

Posted by Gary Yao <ga...@apache.org>.
Can you try to set config option taskmanager.numberOfTaskSlots to 2? By
default the TMs only offer one slot [1] independent from the number of CPU
cores.

Best,
Gary

[1]
https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199

On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <
amartinez@alto-analytics.com> wrote:

> Hello
>
> I'm running Flink over Amazon EMR and I'm trying to send several different
> batch jobs to the cluster after creating it.
>
> This is my cluster creation code:
> ----------------------------------------------------------------
> StepConfig copyJarStep = new StepConfig()
>     .withName("copy-jar-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
> "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));
>
> List<StepConfig> stepConfigs = new ArrayList<>();
> stepConfigs.add(copyJarStep);
>
> Application flink = new Application().withName("Flink");
>
> Configuration flinkConfiguration = new Configuration()
>      .withClassification("flink-conf")
>     .addPropertiesEntry("jobmanager.heap.size", "2048m")
>     .addPropertiesEntry("taskmanager.heap.size",  "2048m")
>
> RunJobFlowRequest request = new RunJobFlowRequest()
>     .withName("cluster-" + executionKey)
>     .withReleaseLabel("emr-5.26.0")
>     .withApplications(flink)
>     .withConfigurations(flinkConfiguration)
>     .withServiceRole("EMR_DefaultRole")
>     .withJobFlowRole("EMR_EC2_DefaultRole")
>     .withLogUri(getWorkPath() + "logs")
>     .withInstances(new JobFlowInstancesConfig()
>         .withEc2SubnetId("subnetid")
>         .withInstanceCount(2) // 1 for task manager + 1 for job manager
>         .withKeepJobFlowAliveWhenNoSteps(true)
>         .withMasterInstanceType("m4.large")
>         .withSlaveInstanceType("m4.large"))
>     .withSteps(stepConfigs);
>
> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
>
> ----------------------------------------------------------------------------------------------------------------
>
> And this is how I add the jobs:
>
> ---------------------------------------------------------------------------------
> StepConfig runJobStep = new StepConfig()
>     .withName("run-job-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>     .withArgs("bash", "-c", "flink run -m yarn-cluster"
>         + " --parallelism " + parallelism
>         + " --class " + jobClass.getCanonicalName()
>         + " /home/hadoop/flink-jobs.jar "
>         + jobArguments));
>
> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
>     .withJobFlowId(clusterId)
>     .withSteps(runJobStep);
>
> AddJobFlowStepsResult result =
> amazonClient.getEmrClient().addJobFlowSteps(request);
>
> ---------------------------------------------------------------------------------
>
> And these are my jobs:
>
> - Job1 - parallelism 1
> - Job2 - parallelism 1
> - Job3 - parallelism 2
>
> I'm using m4.large machines as slave so I have 2 cores in it, and I was
> expecting that Job1 and Job2 were running in parallel and then Job3 when
> Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
> status) for Job1 to finish before start. I see only one task manager is
> created for Job1, when finishes another one is created for Job2, and then 2
> are created for Job3
>
> Since I have 2 cores available why is it not running Job2 in the other
> instead of wait? is there any way to configure it?
>
> Thanks
>
>
>
>