You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2016/08/18 11:56:25 UTC

Batch jobs with a very large number of input splits

Hi,

I'm working on a batch process using Flink and I ran into an interesting
problem.
The number of input splits in my job is really really large.

I currently have a HBase input (with more than 1000 regions) and in the
past I have worked with MapReduce jobs doing 2000+ files.

The problem I have is that if I run such a job in a "small" yarn-session
(i.e. less than 1000 tasks) I get a fatal error indicating that there are
not enough resources.
For a continuous streaming job this makes sense, yet for a batch job (like
I'm having) this is an undesirable error.

For my HBase situation I currently have a workaround by overriding the
creatInputSplits method from the TableInputFormat and thus control the
input splits that are created.

What is the correct way to solve this (no my cluster is NOT big enough to
run that many parallel tasks) ?


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Batch jobs with a very large number of input splits

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Niels,

yes, in YARN mode, the default parallelism is the number of available slots.

You can change the default task parallelism like this:

1) Use the -p parameter when submitting a job via the CLI client [1]
2) Set a parallelism on the execution environment: env.setParallelism()

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#execution-environment-level


2016-08-23 10:29 GMT+02:00 Niels Basjes <Ni...@basjes.nl>:

> I did more digging and finally understand what goes wrong.
> I create a yarn-session with 50 slots.
> Then I run my job that (due to the fact that my HBase table has 100s of
> regions) has a lot of inputsplits.
> The job then runs with parallelism 50 because I did not specify the value.
> As a consequence the second job I start in the same yarn-session is faced
> with 0 available task slots and fails with this exception:
>
> 08/23/2016 09:58:52 Job execution switched to status FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: ...... Resources available to scheduler:
> Number of instances=5, total number of slots=50, available slots=0
>
> So my conclusion for now is that if you want to run batch jobs in
> yarn-session then you MUST specify the parallelism for all steps or
> otherwise it will fill the yarn-session completely and you cannot run
> multiple jobs in parallel.
>
> Is this conclusion correct?
>
> Niels Basjes
>
>
> On Fri, Aug 19, 2016 at 3:18 PM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Niels,
>>
>> In Flink, you don't need one task per file, since splits are assigned
>> lazily to reading tasks.
>> What exactly is the error you are getting when trying to read that many
>> input splits? (Is it on the JobManager?)
>>
>> Regards,
>> Robert
>>
>> On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes <Ni...@basjes.nl> wrote:
>>
>>> Hi,
>>>
>>> I'm working on a batch process using Flink and I ran into an interesting
>>> problem.
>>> The number of input splits in my job is really really large.
>>>
>>> I currently have a HBase input (with more than 1000 regions) and in the
>>> past I have worked with MapReduce jobs doing 2000+ files.
>>>
>>> The problem I have is that if I run such a job in a "small" yarn-session
>>> (i.e. less than 1000 tasks) I get a fatal error indicating that there are
>>> not enough resources.
>>> For a continuous streaming job this makes sense, yet for a batch job
>>> (like I'm having) this is an undesirable error.
>>>
>>> For my HBase situation I currently have a workaround by overriding the
>>> creatInputSplits method from the TableInputFormat and thus control the
>>> input splits that are created.
>>>
>>> What is the correct way to solve this (no my cluster is NOT big enough
>>> to run that many parallel tasks) ?
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Batch jobs with a very large number of input splits

Posted by Niels Basjes <Ni...@basjes.nl>.
I did more digging and finally understand what goes wrong.
I create a yarn-session with 50 slots.
Then I run my job that (due to the fact that my HBase table has 100s of
regions) has a lot of inputsplits.
The job then runs with parallelism 50 because I did not specify the value.
As a consequence the second job I start in the same yarn-session is faced
with 0 available task slots and fails with this exception:

08/23/2016 09:58:52 Job execution switched to status FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: ...... Resources available to scheduler:
Number of instances=5, total number of slots=50, available slots=0

So my conclusion for now is that if you want to run batch jobs in
yarn-session then you MUST specify the parallelism for all steps or
otherwise it will fill the yarn-session completely and you cannot run
multiple jobs in parallel.

Is this conclusion correct?

Niels Basjes


On Fri, Aug 19, 2016 at 3:18 PM, Robert Metzger <rm...@apache.org> wrote:

> Hi Niels,
>
> In Flink, you don't need one task per file, since splits are assigned
> lazily to reading tasks.
> What exactly is the error you are getting when trying to read that many
> input splits? (Is it on the JobManager?)
>
> Regards,
> Robert
>
> On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> I'm working on a batch process using Flink and I ran into an interesting
>> problem.
>> The number of input splits in my job is really really large.
>>
>> I currently have a HBase input (with more than 1000 regions) and in the
>> past I have worked with MapReduce jobs doing 2000+ files.
>>
>> The problem I have is that if I run such a job in a "small" yarn-session
>> (i.e. less than 1000 tasks) I get a fatal error indicating that there are
>> not enough resources.
>> For a continuous streaming job this makes sense, yet for a batch job
>> (like I'm having) this is an undesirable error.
>>
>> For my HBase situation I currently have a workaround by overriding the
>> creatInputSplits method from the TableInputFormat and thus control the
>> input splits that are created.
>>
>> What is the correct way to solve this (no my cluster is NOT big enough to
>> run that many parallel tasks) ?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Batch jobs with a very large number of input splits

Posted by Robert Metzger <rm...@apache.org>.
Hi Niels,

In Flink, you don't need one task per file, since splits are assigned
lazily to reading tasks.
What exactly is the error you are getting when trying to read that many
input splits? (Is it on the JobManager?)

Regards,
Robert

On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> I'm working on a batch process using Flink and I ran into an interesting
> problem.
> The number of input splits in my job is really really large.
>
> I currently have a HBase input (with more than 1000 regions) and in the
> past I have worked with MapReduce jobs doing 2000+ files.
>
> The problem I have is that if I run such a job in a "small" yarn-session
> (i.e. less than 1000 tasks) I get a fatal error indicating that there are
> not enough resources.
> For a continuous streaming job this makes sense, yet for a batch job (like
> I'm having) this is an undesirable error.
>
> For my HBase situation I currently have a workaround by overriding the
> creatInputSplits method from the TableInputFormat and thus control the
> input splits that are created.
>
> What is the correct way to solve this (no my cluster is NOT big enough to
> run that many parallel tasks) ?
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>