You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Adrian Hains <ah...@salesforce.com> on 2018/04/13 06:59:51 UTC

Unable to launch job with 100 SQL queries in yarn cluster

Hi,
We are having trouble scaling up Flink to execute a collection of SQL
queries on a yarn cluster. Has anyone run this kind of workload on a
cluster? Any tips on how to get past this issue?

With a high number of Flink SQL queries (100 instances of the query at the
bottom of this message), the Flink command line client fails with a
“JobManager did not respond within 600000 ms” on a Yarn cluster. JobManager
logs has nothing after the last TaskManager started indicating its hung
(creating the ExecutionGraph?). This configuration of queries works as a
standalone program locally. I can also successfully launch and process 2
instances of the query in cluster mode.

When attempting 10 query instances in cluster mode, we are able to submit
but the job errors out with “Insufficient number of network buffers:
required 725, but only 135 available. The total number of network buffers
is currently set to 61035 of 32768 bytes each. ”. Though surprisingly with
a query count of 1, 15000 is all the network buffers that are needed. So it
seems like the network buffer count quickly scales with the number of
queries.

Note: Each Row in structStream contains 515 columns (very sparse table,
>500 are null for each row) including a column that has the raw message.

In the YARN cluster we specify 18GB for TaskManager, 18GB for the
JobManager, 5 slots and parallelism of 725 (the number of partitions in our
Kafka source).

The query is a simple filter and aggregation:

select count(*), 'idnumber' as criteria, Environment,
CollectedTimestamp, EventTimestamp, RawMsg, Source \n" +
        "from structStream \n" +
        "where Environment='MyEnvironment' and Rule='MyRule' and
LogType='MyLogType' and Outcome='Success'\n" +
        "group by tumble(proctime, INTERVAL '1' SECOND), Environment,
CollectedTimestamp, EventTimestamp, RawMsg, Source"


The code is included in https://issues.apache.org/jira/browse/FLINK-9166

thanks!
-a

-- 


<http://smart.salesforce.com/sig/ahains//us_mb/default/link.html>

Re: Unable to launch job with 100 SQL queries in yarn cluster

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Adrian,

Thanks reaching out to the community. I don't think that this is an issue
with Flink's SQL support. SQL queries are translated into regular streaming
(or batch) jobs.

The JM might just be overloaded by too many jobs. Since you are running in
a YARN environment, it might make sense to try to start more Flink clusters
and distribute the queries to more JMs.
In the upcoming Flink 1.5 release, the scheduling and integration with
resource managers will be completely reworked and make a
one-cluster-per-job deployment easier to maintain.

I've added some details to FLINK-9166.

Best,
Fabian

2018-04-13 8:59 GMT+02:00 Adrian Hains <ah...@salesforce.com>:

> Hi,
> We are having trouble scaling up Flink to execute a collection of SQL
> queries on a yarn cluster. Has anyone run this kind of workload on a
> cluster? Any tips on how to get past this issue?
>
> With a high number of Flink SQL queries (100 instances of the query at the
> bottom of this message), the Flink command line client fails with a
> “JobManager did not respond within 600000 ms” on a Yarn cluster. JobManager
> logs has nothing after the last TaskManager started indicating its hung
> (creating the ExecutionGraph?). This configuration of queries works as a
> standalone program locally. I can also successfully launch and process 2
> instances of the query in cluster mode.
>
> When attempting 10 query instances in cluster mode, we are able to submit
> but the job errors out with “Insufficient number of network buffers:
> required 725, but only 135 available. The total number of network buffers
> is currently set to 61035 of 32768 bytes each. ”. Though surprisingly with
> a query count of 1, 15000 is all the network buffers that are needed. So it
> seems like the network buffer count quickly scales with the number of
> queries.
>
> Note: Each Row in structStream contains 515 columns (very sparse table,
> >500 are null for each row) including a column that has the raw message.
>
> In the YARN cluster we specify 18GB for TaskManager, 18GB for the
> JobManager, 5 slots and parallelism of 725 (the number of partitions in our
> Kafka source).
>
> The query is a simple filter and aggregation:
>
> select count(*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source \n" +
>         "from structStream \n" +
>         "where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success'\n" +
>         "group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source"
>
>
> The code is included in https://issues.apache.org/jira/browse/FLINK-9166
>
> thanks!
> -a
>
> --
>
>
> <http://smart.salesforce.com/sig/ahains//us_mb/default/link.html>
>