You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Waury <ro...@googlemail.com> on 2014/10/13 13:07:33 UTC

(Unknown)

Hi,

I performed the Yarn Setup on a cluster running Apache Hadoop
2.3.0-cdh5.1.3 like described on the website.

I could see the allocated containers in the Yarn ResourceManger and after
starting a Flink job via the CLI client it showed up on the Flink Dashboard.

The problem is that the job which runs in about 17 minutes in my local VM
(3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
rf=5).

>From the Flink log it seemed all data was shuffled to a single machine even
for FlatMap operations.

log excerpt:

10:54:08,832 INFO
org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList
 - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file
input split (distance 2147483647)
10:54:08,832 INFO
org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  -
CHAIN DataSource (TextInputFormat
(hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap
(com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor)
(1/1) receives input split 5
10:54:09,589 INFO
org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList
 - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file
input split (distance 2147483647)
10:54:09,590 INFO
org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  -
CHAIN DataSource (TextInputFormat
(hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap
(com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor)
(1/1) receives input split 128

The job takes two large input files (~9 GB) and after filtering and
converting them with a FlatMap (selectivity is below 1%) it joins them each
twice with a small data set (< 1MB) after that the join results are joined
with each other. The result is about 2.7 GB.

Any idea what causes this?

Cheers,
Robert

Re:

Posted by Robert Metzger <rm...@apache.org>.
Are you referring to https://issues.apache.org/jira/browse/FLINK-968? So as
I said, users can pass the the "-s" parameter to set the number of slots
per container and the number is being used by the CliFrontned.


I just found out that only the YARN cluster setup page mentions slots at
all.
So how slots are being used is basically not documented, however a very
important concept to properly configure and run Flink. ( -->
https://issues.apache.org/jira/browse/FLINK-1157)

On Mon, Oct 13, 2014 at 2:13 PM, Stephan Ewen <se...@apache.org> wrote:

> There is a ticket open for that, to configure the default DOP based on the
> number of containers and slots. It is not implemented, yet, though.
>
>
>
> On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <robert.waury@googlemail.com
> > wrote:
>
>> Yes, I'm running 0.6.1
>>
>> Setting DOP manually worked, thanks.
>>
>> Computation time is now down to around a 100 seconds.
>>
>> Is there a way to let Flink figure out the DOP automatically within a
>> Yarn application or do I always have to set it manually?
>>
>> Cheers,
>> Robert
>>
>>
>>
>> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> It looks like the job is running with a DOP of one.
>>>
>>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>>> or (preferably) through the "-p" parameter on the command line.
>>>
>>> You are using 0.6, is that correct? (Looks like it from the logs)
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>>> robert.waury@googlemail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>>> 2.3.0-cdh5.1.3 like described on the website.
>>>>
>>>> I could see the allocated containers in the Yarn ResourceManger and
>>>> after starting a Flink job via the CLI client it showed up on the Flink
>>>> Dashboard.
>>>>
>>>> The problem is that the job which runs in about 17 minutes in my local
>>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>>> rf=5).
>>>>
>>>> From the Flink log it seemed all data was shuffled to a single machine
>>>> even for FlatMap operations.
>>>>
>>>> log excerpt:
>>>>
>>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>>>
>>>> The job takes two large input files (~9 GB) and after filtering and
>>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>>> twice with a small data set (< 1MB) after that the join results are joined
>>>> with each other. The result is about 2.7 GB.
>>>>
>>>> Any idea what causes this?
>>>>
>>>> Cheers,
>>>> Robert
>>>>
>>>
>>>
>>
>

Re:

Posted by Stephan Ewen <se...@apache.org>.
There is a ticket open for that, to configure the default DOP based on the
number of containers and slots. It is not implemented, yet, though.



On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <ro...@googlemail.com>
wrote:

> Yes, I'm running 0.6.1
>
> Setting DOP manually worked, thanks.
>
> Computation time is now down to around a 100 seconds.
>
> Is there a way to let Flink figure out the DOP automatically within a Yarn
> application or do I always have to set it manually?
>
> Cheers,
> Robert
>
>
>
> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> It looks like the job is running with a DOP of one.
>>
>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>> or (preferably) through the "-p" parameter on the command line.
>>
>> You are using 0.6, is that correct? (Looks like it from the logs)
>>
>> Stephan
>>
>>
>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>> robert.waury@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>> 2.3.0-cdh5.1.3 like described on the website.
>>>
>>> I could see the allocated containers in the Yarn ResourceManger and
>>> after starting a Flink job via the CLI client it showed up on the Flink
>>> Dashboard.
>>>
>>> The problem is that the job which runs in about 17 minutes in my local
>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>> rf=5).
>>>
>>> From the Flink log it seemed all data was shuffled to a single machine
>>> even for FlatMap operations.
>>>
>>> log excerpt:
>>>
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>>
>>> The job takes two large input files (~9 GB) and after filtering and
>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>> twice with a small data set (< 1MB) after that the join results are joined
>>> with each other. The result is about 2.7 GB.
>>>
>>> Any idea what causes this?
>>>
>>> Cheers,
>>> Robert
>>>
>>
>>
>

Re:

Posted by Robert Metzger <rm...@apache.org>.
In the 0.6.1 release not, no.
With the upcoming 0.7-incubating release, you can set the number of task
slots per Container (-s flag) and this value will be used automatically as
the default DOP.

On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <ro...@googlemail.com>
wrote:

> Yes, I'm running 0.6.1
>
> Setting DOP manually worked, thanks.
>
> Computation time is now down to around a 100 seconds.
>
> Is there a way to let Flink figure out the DOP automatically within a Yarn
> application or do I always have to set it manually?
>
> Cheers,
> Robert
>
>
>
> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> It looks like the job is running with a DOP of one.
>>
>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>> or (preferably) through the "-p" parameter on the command line.
>>
>> You are using 0.6, is that correct? (Looks like it from the logs)
>>
>> Stephan
>>
>>
>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>> robert.waury@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>> 2.3.0-cdh5.1.3 like described on the website.
>>>
>>> I could see the allocated containers in the Yarn ResourceManger and
>>> after starting a Flink job via the CLI client it showed up on the Flink
>>> Dashboard.
>>>
>>> The problem is that the job which runs in about 17 minutes in my local
>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>> rf=5).
>>>
>>> From the Flink log it seemed all data was shuffled to a single machine
>>> even for FlatMap operations.
>>>
>>> log excerpt:
>>>
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>>
>>> The job takes two large input files (~9 GB) and after filtering and
>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>> twice with a small data set (< 1MB) after that the join results are joined
>>> with each other. The result is about 2.7 GB.
>>>
>>> Any idea what causes this?
>>>
>>> Cheers,
>>> Robert
>>>
>>
>>
>

Re:

Posted by Robert Waury <ro...@googlemail.com>.
Yes, I'm running 0.6.1

Setting DOP manually worked, thanks.

Computation time is now down to around a 100 seconds.

Is there a way to let Flink figure out the DOP automatically within a Yarn
application or do I always have to set it manually?

Cheers,
Robert



On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> It looks like the job is running with a DOP of one.
>
> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
> or (preferably) through the "-p" parameter on the command line.
>
> You are using 0.6, is that correct? (Looks like it from the logs)
>
> Stephan
>
>
> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <robert.waury@googlemail.com
> > wrote:
>
>> Hi,
>>
>> I performed the Yarn Setup on a cluster running Apache Hadoop
>> 2.3.0-cdh5.1.3 like described on the website.
>>
>> I could see the allocated containers in the Yarn ResourceManger and after
>> starting a Flink job via the CLI client it showed up on the Flink Dashboard.
>>
>> The problem is that the job which runs in about 17 minutes in my local VM
>> (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>> rf=5).
>>
>> From the Flink log it seemed all data was shuffled to a single machine
>> even for FlatMap operations.
>>
>> log excerpt:
>>
>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>
>> The job takes two large input files (~9 GB) and after filtering and
>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>> twice with a small data set (< 1MB) after that the join results are joined
>> with each other. The result is about 2.7 GB.
>>
>> Any idea what causes this?
>>
>> Cheers,
>> Robert
>>
>
>

Re:

Posted by Stephan Ewen <se...@apache.org>.
Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or
(preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <ro...@googlemail.com>
wrote:

> Hi,
>
> I performed the Yarn Setup on a cluster running Apache Hadoop
> 2.3.0-cdh5.1.3 like described on the website.
>
> I could see the allocated containers in the Yarn ResourceManger and after
> starting a Flink job via the CLI client it showed up on the Flink Dashboard.
>
> The problem is that the job which runs in about 17 minutes in my local VM
> (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
> rf=5).
>
> From the Flink log it seemed all data was shuffled to a single machine
> even for FlatMap operations.
>
> log excerpt:
>
> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>
> The job takes two large input files (~9 GB) and after filtering and
> converting them with a FlatMap (selectivity is below 1%) it joins them each
> twice with a small data set (< 1MB) after that the join results are joined
> with each other. The result is about 2.7 GB.
>
> Any idea what causes this?
>
> Cheers,
> Robert
>