You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephan Ewen <se...@apache.org> on 2014/10/13 13:23:10 UTC

Re:

Hi!

It looks like the job is running with a DOP of one.

Can you set the DOP higher? Either directly on the ExecutionEnvironment, or
(preferably) through the "-p" parameter on the command line.

You are using 0.6, is that correct? (Looks like it from the logs)

Stephan


On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <ro...@googlemail.com>
wrote:

> Hi,
>
> I performed the Yarn Setup on a cluster running Apache Hadoop
> 2.3.0-cdh5.1.3 like described on the website.
>
> I could see the allocated containers in the Yarn ResourceManger and after
> starting a Flink job via the CLI client it showed up on the Flink Dashboard.
>
> The problem is that the job which runs in about 17 minutes in my local VM
> (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
> rf=5).
>
> From the Flink log it seemed all data was shuffled to a single machine
> even for FlatMap operations.
>
> log excerpt:
>
> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>
> The job takes two large input files (~9 GB) and after filtering and
> converting them with a FlatMap (selectivity is below 1%) it joins them each
> twice with a small data set (< 1MB) after that the join results are joined
> with each other. The result is about 2.7 GB.
>
> Any idea what causes this?
>
> Cheers,
> Robert
>

Re:

Posted by Robert Metzger <rm...@apache.org>.
Are you referring to https://issues.apache.org/jira/browse/FLINK-968? So as
I said, users can pass the the "-s" parameter to set the number of slots
per container and the number is being used by the CliFrontned.


I just found out that only the YARN cluster setup page mentions slots at
all.
So how slots are being used is basically not documented, however a very
important concept to properly configure and run Flink. ( -->
https://issues.apache.org/jira/browse/FLINK-1157)

On Mon, Oct 13, 2014 at 2:13 PM, Stephan Ewen <se...@apache.org> wrote:

> There is a ticket open for that, to configure the default DOP based on the
> number of containers and slots. It is not implemented, yet, though.
>
>
>
> On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <robert.waury@googlemail.com
> > wrote:
>
>> Yes, I'm running 0.6.1
>>
>> Setting DOP manually worked, thanks.
>>
>> Computation time is now down to around a 100 seconds.
>>
>> Is there a way to let Flink figure out the DOP automatically within a
>> Yarn application or do I always have to set it manually?
>>
>> Cheers,
>> Robert
>>
>>
>>
>> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> It looks like the job is running with a DOP of one.
>>>
>>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>>> or (preferably) through the "-p" parameter on the command line.
>>>
>>> You are using 0.6, is that correct? (Looks like it from the logs)
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>>> robert.waury@googlemail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>>> 2.3.0-cdh5.1.3 like described on the website.
>>>>
>>>> I could see the allocated containers in the Yarn ResourceManger and
>>>> after starting a Flink job via the CLI client it showed up on the Flink
>>>> Dashboard.
>>>>
>>>> The problem is that the job which runs in about 17 minutes in my local
>>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>>> rf=5).
>>>>
>>>> From the Flink log it seemed all data was shuffled to a single machine
>>>> even for FlatMap operations.
>>>>
>>>> log excerpt:
>>>>
>>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>>>
>>>> The job takes two large input files (~9 GB) and after filtering and
>>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>>> twice with a small data set (< 1MB) after that the join results are joined
>>>> with each other. The result is about 2.7 GB.
>>>>
>>>> Any idea what causes this?
>>>>
>>>> Cheers,
>>>> Robert
>>>>
>>>
>>>
>>
>

Re:

Posted by Stephan Ewen <se...@apache.org>.
There is a ticket open for that, to configure the default DOP based on the
number of containers and slots. It is not implemented, yet, though.



On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <ro...@googlemail.com>
wrote:

> Yes, I'm running 0.6.1
>
> Setting DOP manually worked, thanks.
>
> Computation time is now down to around a 100 seconds.
>
> Is there a way to let Flink figure out the DOP automatically within a Yarn
> application or do I always have to set it manually?
>
> Cheers,
> Robert
>
>
>
> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> It looks like the job is running with a DOP of one.
>>
>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>> or (preferably) through the "-p" parameter on the command line.
>>
>> You are using 0.6, is that correct? (Looks like it from the logs)
>>
>> Stephan
>>
>>
>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>> robert.waury@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>> 2.3.0-cdh5.1.3 like described on the website.
>>>
>>> I could see the allocated containers in the Yarn ResourceManger and
>>> after starting a Flink job via the CLI client it showed up on the Flink
>>> Dashboard.
>>>
>>> The problem is that the job which runs in about 17 minutes in my local
>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>> rf=5).
>>>
>>> From the Flink log it seemed all data was shuffled to a single machine
>>> even for FlatMap operations.
>>>
>>> log excerpt:
>>>
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>>
>>> The job takes two large input files (~9 GB) and after filtering and
>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>> twice with a small data set (< 1MB) after that the join results are joined
>>> with each other. The result is about 2.7 GB.
>>>
>>> Any idea what causes this?
>>>
>>> Cheers,
>>> Robert
>>>
>>
>>
>

Re:

Posted by Robert Metzger <rm...@apache.org>.
In the 0.6.1 release not, no.
With the upcoming 0.7-incubating release, you can set the number of task
slots per Container (-s flag) and this value will be used automatically as
the default DOP.

On Mon, Oct 13, 2014 at 2:09 PM, Robert Waury <ro...@googlemail.com>
wrote:

> Yes, I'm running 0.6.1
>
> Setting DOP manually worked, thanks.
>
> Computation time is now down to around a 100 seconds.
>
> Is there a way to let Flink figure out the DOP automatically within a Yarn
> application or do I always have to set it manually?
>
> Cheers,
> Robert
>
>
>
> On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> It looks like the job is running with a DOP of one.
>>
>> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
>> or (preferably) through the "-p" parameter on the command line.
>>
>> You are using 0.6, is that correct? (Looks like it from the logs)
>>
>> Stephan
>>
>>
>> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <
>> robert.waury@googlemail.com> wrote:
>>
>>> Hi,
>>>
>>> I performed the Yarn Setup on a cluster running Apache Hadoop
>>> 2.3.0-cdh5.1.3 like described on the website.
>>>
>>> I could see the allocated containers in the Yarn ResourceManger and
>>> after starting a Flink job via the CLI client it showed up on the Flink
>>> Dashboard.
>>>
>>> The problem is that the job which runs in about 17 minutes in my local
>>> VM (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>>> rf=5).
>>>
>>> From the Flink log it seemed all data was shuffled to a single machine
>>> even for FlatMap operations.
>>>
>>> log excerpt:
>>>
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>>
>>> The job takes two large input files (~9 GB) and after filtering and
>>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>>> twice with a small data set (< 1MB) after that the join results are joined
>>> with each other. The result is about 2.7 GB.
>>>
>>> Any idea what causes this?
>>>
>>> Cheers,
>>> Robert
>>>
>>
>>
>

Re:

Posted by Robert Waury <ro...@googlemail.com>.
Yes, I'm running 0.6.1

Setting DOP manually worked, thanks.

Computation time is now down to around a 100 seconds.

Is there a way to let Flink figure out the DOP automatically within a Yarn
application or do I always have to set it manually?

Cheers,
Robert



On Mon, Oct 13, 2014 at 1:23 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> It looks like the job is running with a DOP of one.
>
> Can you set the DOP higher? Either directly on the ExecutionEnvironment,
> or (preferably) through the "-p" parameter on the command line.
>
> You are using 0.6, is that correct? (Looks like it from the logs)
>
> Stephan
>
>
> On Mon, Oct 13, 2014 at 1:07 PM, Robert Waury <robert.waury@googlemail.com
> > wrote:
>
>> Hi,
>>
>> I performed the Yarn Setup on a cluster running Apache Hadoop
>> 2.3.0-cdh5.1.3 like described on the website.
>>
>> I could see the allocated containers in the Yarn ResourceManger and after
>> starting a Flink job via the CLI client it showed up on the Flink Dashboard.
>>
>> The problem is that the job which runs in about 17 minutes in my local VM
>> (3 cores, 4GB RAM, input from local files) now takes about 25 minutes on
>> the cluster (18 containers with 4GB and 8 cores each, input from HDFS with
>> rf=5).
>>
>> From the Flink log it seemed all data was shuffled to a single machine
>> even for FlatMap operations.
>>
>> log excerpt:
>>
>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>> 10:54:08,832 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 5
>> 10:54:09,589 INFO  org.apache.flink.runtime.jobmanager.splitassigner.file.FileInputSplitList  - nceorihad06 (ipcPort=56158, dataPort=55744) receives remote file input split (distance 2147483647)
>> 10:54:09,590 INFO  org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager  - CHAIN DataSource (TextInputFormat (hdfs:/user/rwaury/input/all_catalog_140410.txt) - UTF-8) -> FlatMap (com.amadeus.pcb.join.FlightConnectionJoiner$FilteringUTCExtractor) (1/1) receives input split 128
>>
>> The job takes two large input files (~9 GB) and after filtering and
>> converting them with a FlatMap (selectivity is below 1%) it joins them each
>> twice with a small data set (< 1MB) after that the join results are joined
>> with each other. The result is about 2.7 GB.
>>
>> Any idea what causes this?
>>
>> Cheers,
>> Robert
>>
>
>