You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Julien Carme <ju...@gmail.com> on 2014/09/21 13:43:13 UTC

Issues with partitionBy: FetchFailed

Hello,

I am facing an issue with partitionBy, it is not clear whether it is a
problem with my code or with my spark setup. I am using Spark 1.1,
standalone, and my other spark projects work fine.

So I have to repartition a relatively large file (about 70 million lines).
Here is a minimal version of what is not working fine:

myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)

It runs quite some time, until I get some errors and it retries. Errors are:

FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
shuffleId=1,mapId=1,reduceId=5)

Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not
received within 60 sec

I get similar errors with all my workers.

Do you have some kind of explanation for this behaviour? What could be
wrong?

Thanks,

RE: Issues with partitionBy: FetchFailed

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi Andrew, I will try again using jstack.

My question is that will this deadlock issue also lead to FetchFailed exception?

Thanks
Jerry

From: Andrew Ash [mailto:andrew@andrewash.com]
Sent: Tuesday, September 23, 2014 8:29 AM
To: Shao, Saisai
Cc: David Rowe; Julien Carme; user@spark.apache.org
Subject: Re: Issues with partitionBy: FetchFailed

Hi Jerry,

For the one executor hung with one CPU core running at 100%, that sounds exactly like the symptoms I observed in https://issues.apache.org/jira/browse/SPARK-2546 around a deadlock with the JobConf.  The next time that happens can you take a jstack and compare with the one on the ticket?  If it looks like this I believe you've hit SPARK-2546


    at java.util.HashMap.transfer(HashMap.java:601)

    at java.util.HashMap.resize(HashMap.java:581)

    at java.util.HashMap.addEntry(HashMap.java:879)

    at java.util.HashMap.put(HashMap.java:505)

    at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)

    at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)

Cheers!
Andrew

On Mon, Sep 22, 2014 at 5:14 AM, Shao, Saisai <sa...@intel.com>> wrote:
I didn’t meet this issue (Too many open files) as yours, because I set a relative large open file numbers in Linux, like 640K. What I was seeing is that one executor is pausing without doing anything, all the resources are not fully used, and one cpu core is running into 100%, so I assume this process is in full gc.

And the exception I met is FetchFailed, as I set a large value of “spark.core.connection.ack.wait.timeout”, this FetchFailed exception is relieved.

Since in the current master branch, connection manager related code is under refactoring, so the behavior may be different from the previous code, I guess probably some potential bugs may introduced.

Thanks
Jerry

From: David Rowe [mailto:davidrowe@gmail.com<ma...@gmail.com>]
Sent: Monday, September 22, 2014 7:12 PM
To: Andrew Ash
Cc: Shao, Saisai; Julien Carme; user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Issues with partitionBy: FetchFailed

Yep, this is what I was seeing. I'll experiment tomorrow with a version prior to the changeset in that ticket.

On Mon, Sep 22, 2014 at 8:29 PM, Andrew Ash <an...@andrewash.com>> wrote:
Hi David and Saisai,

Are the exceptions you two are observing similar to the first one at https://issues.apache.org/jira/browse/SPARK-3633 ?  Copied below:


14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1 (TID 552, c1705.halxg.cloudera.com<http://c1705.halxg.cloudera.com>): FetchFailed(BlockManagerId(1, c1706.halxg.cloudera.com<http://c1706.halxg.cloudera.com>, 49612, 0), shuffleId=3, mapId=75, reduceId=120)

I'm seeing the same using Spark SQL on 1.1.0 -- I think there may have been a regression in 1.1 because the same SQL query works on the same cluster when back on 1.0.2

Thanks!
Andrew

On Sun, Sep 21, 2014 at 5:15 AM, David Rowe <da...@gmail.com>> wrote:
Hi,

I've seen this problem before, and I'm not convinced it's GC.

When spark shuffles it writes a lot of small files to store the data to be sent to other executors (AFAICT). According to what I've read around the place the intention is that these files be stored in disk buffers, and since sync() is never called, they exist only in memory. The problem is when you have a lot of shuffle data, and the executors are configured to use, say 90% of your memory, one of those is going to be written to disk - either the JVM will be swapped out, or the files will be written out of cache.

So, when these nodes are timing out, it's not a GC problem, it's that the machine is actually thrashing.

I've had some success with this problem by reducing the amount of memory that the executors are configured to use from say 90% to 60%. I don't know the internals of the code, but I'm sure this number is related to the fraction of your data that's going to be shuffled to other nodes. In any case, it's not something that I can estimate in my own jobs very well - I usually have to find the right number by trial and error.

Perhaps somebody who knows the internals a bit better can shed some light.

Cheers

Dave

On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai <sa...@intel.com>> wrote:
Hi,

I’ve also met this problem before, I think you can try to set “spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, default is 60 seconds.

Sometimes because of GC pause or some other reasons, acknowledged message will be timeout, which will lead to this exception, you can try setting a large value of this configuration.

Thanks
Jerry

From: Julien Carme [mailto:julien.carme@gmail.com<ma...@gmail.com>]
Sent: Sunday, September 21, 2014 7:43 PM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: Issues with partitionBy: FetchFailed

Hello,
I am facing an issue with partitionBy, it is not clear whether it is a problem with my code or with my spark setup. I am using Spark 1.1, standalone, and my other spark projects work fine.
So I have to repartition a relatively large file (about 70 million lines). Here is a minimal version of what is not working fine:
myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)
It runs quite some time, until I get some errors and it retries. Errors are:
FetchFailed(BlockManagerId(3,myWorker2, 52082,0), shuffleId=1,mapId=1,reduceId=5)
Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not received within 60 sec

I get similar errors with all my workers.
Do you have some kind of explanation for this behaviour? What could be wrong?
Thanks,







Re: Issues with partitionBy: FetchFailed

Posted by Andrew Ash <an...@andrewash.com>.
Hi Jerry,

For the one executor hung with one CPU core running at 100%, that sounds
exactly like the symptoms I observed in
https://issues.apache.org/jira/browse/SPARK-2546 around a deadlock with the
JobConf.  The next time that happens can you take a jstack and compare with
the one on the ticket?  If it looks like this I believe you've hit
SPARK-2546

    at java.util.HashMap.transfer(HashMap.java:601)
    at java.util.HashMap.resize(HashMap.java:581)
    at java.util.HashMap.addEntry(HashMap.java:879)
    at java.util.HashMap.put(HashMap.java:505)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)


Cheers!
Andrew

On Mon, Sep 22, 2014 at 5:14 AM, Shao, Saisai <sa...@intel.com> wrote:

>  I didn’t meet this issue (Too many open files) as yours, because I set a
> relative large open file numbers in Linux, like 640K. What I was seeing is
> that one executor is pausing without doing anything, all the resources are
> not fully used, and one cpu core is running into 100%, so I assume this
> process is in full gc.
>
>
>
> And the exception I met is FetchFailed, as I set a large value of “
> spark.core.connection.ack.wait.timeout”, this FetchFailed exception is
> relieved.
>
>
>
> Since in the current master branch, connection manager related code is
> under refactoring, so the behavior may be different from the previous code,
> I guess probably some potential bugs may introduced.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* David Rowe [mailto:davidrowe@gmail.com]
> *Sent:* Monday, September 22, 2014 7:12 PM
> *To:* Andrew Ash
> *Cc:* Shao, Saisai; Julien Carme; user@spark.apache.org
> *Subject:* Re: Issues with partitionBy: FetchFailed
>
>
>
> Yep, this is what I was seeing. I'll experiment tomorrow with a version
> prior to the changeset in that ticket.
>
>
>
> On Mon, Sep 22, 2014 at 8:29 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>  Hi David and Saisai,
>
>
>
> Are the exceptions you two are observing similar to the first one at
> https://issues.apache.org/jira/browse/SPARK-3633 ?  Copied below:
>
>
>
> 14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1 (TID 552, c1705.halxg.cloudera.com): FetchFailed(BlockManagerId(1, c1706.halxg.cloudera.com, 49612, 0), shuffleId=3, mapId=75, reduceId=120)
>
>
>
> I'm seeing the same using Spark SQL on 1.1.0 -- I think there may have
> been a regression in 1.1 because the same SQL query works on the same
> cluster when back on 1.0.2
>
>
>
> Thanks!
>
> Andrew
>
>
>
> On Sun, Sep 21, 2014 at 5:15 AM, David Rowe <da...@gmail.com> wrote:
>
>  Hi,
>
>
>
> I've seen this problem before, and I'm not convinced it's GC.
>
>
>
> When spark shuffles it writes a lot of small files to store the data to be
> sent to other executors (AFAICT). According to what I've read around the
> place the intention is that these files be stored in disk buffers, and
> since sync() is never called, they exist only in memory. The problem is
> when you have a lot of shuffle data, and the executors are configured to
> use, say 90% of your memory, one of those is going to be written to disk -
> either the JVM will be swapped out, or the files will be written out of
> cache.
>
>
>
> So, when these nodes are timing out, it's not a GC problem, it's that the
> machine is actually thrashing.
>
>
>
> I've had some success with this problem by reducing the amount of memory
> that the executors are configured to use from say 90% to 60%. I don't know
> the internals of the code, but I'm sure this number is related to the
> fraction of your data that's going to be shuffled to other nodes. In any
> case, it's not something that I can estimate in my own jobs very well - I
> usually have to find the right number by trial and error.
>
>
>
> Perhaps somebody who knows the internals a bit better can shed some light.
>
>
>
> Cheers
>
>
>
> Dave
>
>
>
> On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai <sa...@intel.com>
> wrote:
>
>  Hi,
>
>
>
> I’ve also met this problem before, I think you can try to set
> “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
> timeout, default is 60 seconds.
>
>
>
> Sometimes because of GC pause or some other reasons, acknowledged message
> will be timeout, which will lead to this exception, you can try setting a
> large value of this configuration.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Julien Carme [mailto:julien.carme@gmail.com]
> *Sent:* Sunday, September 21, 2014 7:43 PM
> *To:* user@spark.apache.org
> *Subject:* Issues with partitionBy: FetchFailed
>
>
>
> Hello,
>
> I am facing an issue with partitionBy, it is not clear whether it is a
> problem with my code or with my spark setup. I am using Spark 1.1,
> standalone, and my other spark projects work fine.
>
> So I have to repartition a relatively large file (about 70 million lines).
> Here is a minimal version of what is not working fine:
>
> myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
>
> myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
>
> myRepartitionedRDD.saveAsTextFile(...)
>
> It runs quite some time, until I get some errors and it retries. Errors
> are:
>
> FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
> shuffleId=1,mapId=1,reduceId=5)
>
> Logs are not much more infomrative. I get:
>
> Java.io.IOException : sendMessageReliability failed because ack was not
> received within 60 sec
>
>
>
> I get similar errors with all my workers.
>
> Do you have some kind of explanation for this behaviour? What could be
> wrong?
>
> Thanks,
>
>
>
>
>
>
>
>
>
>
>

Re: Issues with partitionBy: FetchFailed

Posted by David Rowe <da...@gmail.com>.
So, spinning up an identical cluster with spark 1.0.1, my job fails with
the error:

14/09/22 12:30:28 WARN scheduler.TaskSetManager: Lost task 217.0 in stage
4.0 (TID 66613, ip-10-251-34-68.ap-southeast-2.compute.internal):
org.apache.spark.SparkException: Error communicating with MapOutputTracker
  org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
 etc etc

during the shuffle phase.

IIRC, this is where I was getting the FetchFailed errors before.


On Mon, Sep 22, 2014 at 10:14 PM, Shao, Saisai <sa...@intel.com>
wrote:

>  I didn’t meet this issue (Too many open files) as yours, because I set a
> relative large open file numbers in Linux, like 640K. What I was seeing is
> that one executor is pausing without doing anything, all the resources are
> not fully used, and one cpu core is running into 100%, so I assume this
> process is in full gc.
>
>
>
> And the exception I met is FetchFailed, as I set a large value of “
> spark.core.connection.ack.wait.timeout”, this FetchFailed exception is
> relieved.
>
>
>
> Since in the current master branch, connection manager related code is
> under refactoring, so the behavior may be different from the previous code,
> I guess probably some potential bugs may introduced.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* David Rowe [mailto:davidrowe@gmail.com]
> *Sent:* Monday, September 22, 2014 7:12 PM
> *To:* Andrew Ash
> *Cc:* Shao, Saisai; Julien Carme; user@spark.apache.org
> *Subject:* Re: Issues with partitionBy: FetchFailed
>
>
>
> Yep, this is what I was seeing. I'll experiment tomorrow with a version
> prior to the changeset in that ticket.
>
>
>
> On Mon, Sep 22, 2014 at 8:29 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>  Hi David and Saisai,
>
>
>
> Are the exceptions you two are observing similar to the first one at
> https://issues.apache.org/jira/browse/SPARK-3633 ?  Copied below:
>
>
>
> 14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1 (TID 552, c1705.halxg.cloudera.com): FetchFailed(BlockManagerId(1, c1706.halxg.cloudera.com, 49612, 0), shuffleId=3, mapId=75, reduceId=120)
>
>
>
> I'm seeing the same using Spark SQL on 1.1.0 -- I think there may have
> been a regression in 1.1 because the same SQL query works on the same
> cluster when back on 1.0.2
>
>
>
> Thanks!
>
> Andrew
>
>
>
> On Sun, Sep 21, 2014 at 5:15 AM, David Rowe <da...@gmail.com> wrote:
>
>  Hi,
>
>
>
> I've seen this problem before, and I'm not convinced it's GC.
>
>
>
> When spark shuffles it writes a lot of small files to store the data to be
> sent to other executors (AFAICT). According to what I've read around the
> place the intention is that these files be stored in disk buffers, and
> since sync() is never called, they exist only in memory. The problem is
> when you have a lot of shuffle data, and the executors are configured to
> use, say 90% of your memory, one of those is going to be written to disk -
> either the JVM will be swapped out, or the files will be written out of
> cache.
>
>
>
> So, when these nodes are timing out, it's not a GC problem, it's that the
> machine is actually thrashing.
>
>
>
> I've had some success with this problem by reducing the amount of memory
> that the executors are configured to use from say 90% to 60%. I don't know
> the internals of the code, but I'm sure this number is related to the
> fraction of your data that's going to be shuffled to other nodes. In any
> case, it's not something that I can estimate in my own jobs very well - I
> usually have to find the right number by trial and error.
>
>
>
> Perhaps somebody who knows the internals a bit better can shed some light.
>
>
>
> Cheers
>
>
>
> Dave
>
>
>
> On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai <sa...@intel.com>
> wrote:
>
>  Hi,
>
>
>
> I’ve also met this problem before, I think you can try to set
> “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
> timeout, default is 60 seconds.
>
>
>
> Sometimes because of GC pause or some other reasons, acknowledged message
> will be timeout, which will lead to this exception, you can try setting a
> large value of this configuration.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Julien Carme [mailto:julien.carme@gmail.com]
> *Sent:* Sunday, September 21, 2014 7:43 PM
> *To:* user@spark.apache.org
> *Subject:* Issues with partitionBy: FetchFailed
>
>
>
> Hello,
>
> I am facing an issue with partitionBy, it is not clear whether it is a
> problem with my code or with my spark setup. I am using Spark 1.1,
> standalone, and my other spark projects work fine.
>
> So I have to repartition a relatively large file (about 70 million lines).
> Here is a minimal version of what is not working fine:
>
> myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
>
> myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
>
> myRepartitionedRDD.saveAsTextFile(...)
>
> It runs quite some time, until I get some errors and it retries. Errors
> are:
>
> FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
> shuffleId=1,mapId=1,reduceId=5)
>
> Logs are not much more infomrative. I get:
>
> Java.io.IOException : sendMessageReliability failed because ack was not
> received within 60 sec
>
>
>
> I get similar errors with all my workers.
>
> Do you have some kind of explanation for this behaviour? What could be
> wrong?
>
> Thanks,
>
>
>
>
>
>
>
>
>
>
>

RE: Issues with partitionBy: FetchFailed

Posted by "Shao, Saisai" <sa...@intel.com>.
I didn’t meet this issue (Too many open files) as yours, because I set a relative large open file numbers in Linux, like 640K. What I was seeing is that one executor is pausing without doing anything, all the resources are not fully used, and one cpu core is running into 100%, so I assume this process is in full gc.

And the exception I met is FetchFailed, as I set a large value of “spark.core.connection.ack.wait.timeout”, this FetchFailed exception is relieved.

Since in the current master branch, connection manager related code is under refactoring, so the behavior may be different from the previous code, I guess probably some potential bugs may introduced.

Thanks
Jerry

From: David Rowe [mailto:davidrowe@gmail.com]
Sent: Monday, September 22, 2014 7:12 PM
To: Andrew Ash
Cc: Shao, Saisai; Julien Carme; user@spark.apache.org
Subject: Re: Issues with partitionBy: FetchFailed

Yep, this is what I was seeing. I'll experiment tomorrow with a version prior to the changeset in that ticket.

On Mon, Sep 22, 2014 at 8:29 PM, Andrew Ash <an...@andrewash.com>> wrote:
Hi David and Saisai,

Are the exceptions you two are observing similar to the first one at https://issues.apache.org/jira/browse/SPARK-3633 ?  Copied below:


14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1 (TID 552, c1705.halxg.cloudera.com<http://c1705.halxg.cloudera.com>): FetchFailed(BlockManagerId(1, c1706.halxg.cloudera.com<http://c1706.halxg.cloudera.com>, 49612, 0), shuffleId=3, mapId=75, reduceId=120)

I'm seeing the same using Spark SQL on 1.1.0 -- I think there may have been a regression in 1.1 because the same SQL query works on the same cluster when back on 1.0.2

Thanks!
Andrew

On Sun, Sep 21, 2014 at 5:15 AM, David Rowe <da...@gmail.com>> wrote:
Hi,

I've seen this problem before, and I'm not convinced it's GC.

When spark shuffles it writes a lot of small files to store the data to be sent to other executors (AFAICT). According to what I've read around the place the intention is that these files be stored in disk buffers, and since sync() is never called, they exist only in memory. The problem is when you have a lot of shuffle data, and the executors are configured to use, say 90% of your memory, one of those is going to be written to disk - either the JVM will be swapped out, or the files will be written out of cache.

So, when these nodes are timing out, it's not a GC problem, it's that the machine is actually thrashing.

I've had some success with this problem by reducing the amount of memory that the executors are configured to use from say 90% to 60%. I don't know the internals of the code, but I'm sure this number is related to the fraction of your data that's going to be shuffled to other nodes. In any case, it's not something that I can estimate in my own jobs very well - I usually have to find the right number by trial and error.

Perhaps somebody who knows the internals a bit better can shed some light.

Cheers

Dave

On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai <sa...@intel.com>> wrote:
Hi,

I’ve also met this problem before, I think you can try to set “spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, default is 60 seconds.

Sometimes because of GC pause or some other reasons, acknowledged message will be timeout, which will lead to this exception, you can try setting a large value of this configuration.

Thanks
Jerry

From: Julien Carme [mailto:julien.carme@gmail.com<ma...@gmail.com>]
Sent: Sunday, September 21, 2014 7:43 PM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: Issues with partitionBy: FetchFailed

Hello,
I am facing an issue with partitionBy, it is not clear whether it is a problem with my code or with my spark setup. I am using Spark 1.1, standalone, and my other spark projects work fine.
So I have to repartition a relatively large file (about 70 million lines). Here is a minimal version of what is not working fine:
myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)
It runs quite some time, until I get some errors and it retries. Errors are:
FetchFailed(BlockManagerId(3,myWorker2, 52082,0), shuffleId=1,mapId=1,reduceId=5)
Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not received within 60 sec

I get similar errors with all my workers.
Do you have some kind of explanation for this behaviour? What could be wrong?
Thanks,






Re: Issues with partitionBy: FetchFailed

Posted by David Rowe <da...@gmail.com>.
Yep, this is what I was seeing. I'll experiment tomorrow with a version
prior to the changeset in that ticket.

On Mon, Sep 22, 2014 at 8:29 PM, Andrew Ash <an...@andrewash.com> wrote:

> Hi David and Saisai,
>
> Are the exceptions you two are observing similar to the first one at
> https://issues.apache.org/jira/browse/SPARK-3633 ?  Copied below:
>
> 14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1 (TID 552, c1705.halxg.cloudera.com): FetchFailed(BlockManagerId(1, c1706.halxg.cloudera.com, 49612, 0), shuffleId=3, mapId=75, reduceId=120)
>
>
> I'm seeing the same using Spark SQL on 1.1.0 -- I think there may have
> been a regression in 1.1 because the same SQL query works on the same
> cluster when back on 1.0.2
>
> Thanks!
> Andrew
>
> On Sun, Sep 21, 2014 at 5:15 AM, David Rowe <da...@gmail.com> wrote:
>
>> Hi,
>>
>> I've seen this problem before, and I'm not convinced it's GC.
>>
>> When spark shuffles it writes a lot of small files to store the data to
>> be sent to other executors (AFAICT). According to what I've read around the
>> place the intention is that these files be stored in disk buffers, and
>> since sync() is never called, they exist only in memory. The problem is
>> when you have a lot of shuffle data, and the executors are configured to
>> use, say 90% of your memory, one of those is going to be written to disk -
>> either the JVM will be swapped out, or the files will be written out of
>> cache.
>>
>> So, when these nodes are timing out, it's not a GC problem, it's that the
>> machine is actually thrashing.
>>
>> I've had some success with this problem by reducing the amount of memory
>> that the executors are configured to use from say 90% to 60%. I don't know
>> the internals of the code, but I'm sure this number is related to the
>> fraction of your data that's going to be shuffled to other nodes. In any
>> case, it's not something that I can estimate in my own jobs very well - I
>> usually have to find the right number by trial and error.
>>
>> Perhaps somebody who knows the internals a bit better can shed some light.
>>
>> Cheers
>>
>> Dave
>>
>> On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai <sa...@intel.com>
>> wrote:
>>
>>>  Hi,
>>>
>>>
>>>
>>> I’ve also met this problem before, I think you can try to set
>>> “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
>>> timeout, default is 60 seconds.
>>>
>>>
>>>
>>> Sometimes because of GC pause or some other reasons, acknowledged
>>> message will be timeout, which will lead to this exception, you can try
>>> setting a large value of this configuration.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Jerry
>>>
>>>
>>>
>>> *From:* Julien Carme [mailto:julien.carme@gmail.com]
>>> *Sent:* Sunday, September 21, 2014 7:43 PM
>>> *To:* user@spark.apache.org
>>> *Subject:* Issues with partitionBy: FetchFailed
>>>
>>>
>>>
>>> Hello,
>>>
>>> I am facing an issue with partitionBy, it is not clear whether it is a
>>> problem with my code or with my spark setup. I am using Spark 1.1,
>>> standalone, and my other spark projects work fine.
>>>
>>> So I have to repartition a relatively large file (about 70 million
>>> lines). Here is a minimal version of what is not working fine:
>>>
>>> myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
>>>
>>> myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
>>>
>>> myRepartitionedRDD.saveAsTextFile(...)
>>>
>>> It runs quite some time, until I get some errors and it retries. Errors
>>> are:
>>>
>>> FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
>>> shuffleId=1,mapId=1,reduceId=5)
>>>
>>> Logs are not much more infomrative. I get:
>>>
>>> Java.io.IOException : sendMessageReliability failed because ack was not
>>> received within 60 sec
>>>
>>>
>>>
>>> I get similar errors with all my workers.
>>>
>>> Do you have some kind of explanation for this behaviour? What could be
>>> wrong?
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>>
>>
>>
>

Re: Issues with partitionBy: FetchFailed

Posted by Andrew Ash <an...@andrewash.com>.
Hi David and Saisai,

Are the exceptions you two are observing similar to the first one at
https://issues.apache.org/jira/browse/SPARK-3633 ?  Copied below:

14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1
(TID 552, c1705.halxg.cloudera.com): FetchFailed(BlockManagerId(1,
c1706.halxg.cloudera.com, 49612, 0), shuffleId=3, mapId=75,
reduceId=120)


I'm seeing the same using Spark SQL on 1.1.0 -- I think there may have been
a regression in 1.1 because the same SQL query works on the same cluster
when back on 1.0.2

Thanks!
Andrew

On Sun, Sep 21, 2014 at 5:15 AM, David Rowe <da...@gmail.com> wrote:

> Hi,
>
> I've seen this problem before, and I'm not convinced it's GC.
>
> When spark shuffles it writes a lot of small files to store the data to be
> sent to other executors (AFAICT). According to what I've read around the
> place the intention is that these files be stored in disk buffers, and
> since sync() is never called, they exist only in memory. The problem is
> when you have a lot of shuffle data, and the executors are configured to
> use, say 90% of your memory, one of those is going to be written to disk -
> either the JVM will be swapped out, or the files will be written out of
> cache.
>
> So, when these nodes are timing out, it's not a GC problem, it's that the
> machine is actually thrashing.
>
> I've had some success with this problem by reducing the amount of memory
> that the executors are configured to use from say 90% to 60%. I don't know
> the internals of the code, but I'm sure this number is related to the
> fraction of your data that's going to be shuffled to other nodes. In any
> case, it's not something that I can estimate in my own jobs very well - I
> usually have to find the right number by trial and error.
>
> Perhaps somebody who knows the internals a bit better can shed some light.
>
> Cheers
>
> Dave
>
> On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai <sa...@intel.com>
> wrote:
>
>>  Hi,
>>
>>
>>
>> I’ve also met this problem before, I think you can try to set
>> “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
>> timeout, default is 60 seconds.
>>
>>
>>
>> Sometimes because of GC pause or some other reasons, acknowledged message
>> will be timeout, which will lead to this exception, you can try setting a
>> large value of this configuration.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Julien Carme [mailto:julien.carme@gmail.com]
>> *Sent:* Sunday, September 21, 2014 7:43 PM
>> *To:* user@spark.apache.org
>> *Subject:* Issues with partitionBy: FetchFailed
>>
>>
>>
>> Hello,
>>
>> I am facing an issue with partitionBy, it is not clear whether it is a
>> problem with my code or with my spark setup. I am using Spark 1.1,
>> standalone, and my other spark projects work fine.
>>
>> So I have to repartition a relatively large file (about 70 million
>> lines). Here is a minimal version of what is not working fine:
>>
>> myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
>>
>> myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
>>
>> myRepartitionedRDD.saveAsTextFile(...)
>>
>> It runs quite some time, until I get some errors and it retries. Errors
>> are:
>>
>> FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
>> shuffleId=1,mapId=1,reduceId=5)
>>
>> Logs are not much more infomrative. I get:
>>
>> Java.io.IOException : sendMessageReliability failed because ack was not
>> received within 60 sec
>>
>>
>>
>> I get similar errors with all my workers.
>>
>> Do you have some kind of explanation for this behaviour? What could be
>> wrong?
>>
>> Thanks,
>>
>>
>>
>>
>>
>
>

Re: Issues with partitionBy: FetchFailed

Posted by David Rowe <da...@gmail.com>.
Hi,

I've seen this problem before, and I'm not convinced it's GC.

When spark shuffles it writes a lot of small files to store the data to be
sent to other executors (AFAICT). According to what I've read around the
place the intention is that these files be stored in disk buffers, and
since sync() is never called, they exist only in memory. The problem is
when you have a lot of shuffle data, and the executors are configured to
use, say 90% of your memory, one of those is going to be written to disk -
either the JVM will be swapped out, or the files will be written out of
cache.

So, when these nodes are timing out, it's not a GC problem, it's that the
machine is actually thrashing.

I've had some success with this problem by reducing the amount of memory
that the executors are configured to use from say 90% to 60%. I don't know
the internals of the code, but I'm sure this number is related to the
fraction of your data that's going to be shuffled to other nodes. In any
case, it's not something that I can estimate in my own jobs very well - I
usually have to find the right number by trial and error.

Perhaps somebody who knows the internals a bit better can shed some light.

Cheers

Dave

On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai <sa...@intel.com> wrote:

>  Hi,
>
>
>
> I’ve also met this problem before, I think you can try to set
> “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
> timeout, default is 60 seconds.
>
>
>
> Sometimes because of GC pause or some other reasons, acknowledged message
> will be timeout, which will lead to this exception, you can try setting a
> large value of this configuration.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Julien Carme [mailto:julien.carme@gmail.com]
> *Sent:* Sunday, September 21, 2014 7:43 PM
> *To:* user@spark.apache.org
> *Subject:* Issues with partitionBy: FetchFailed
>
>
>
> Hello,
>
> I am facing an issue with partitionBy, it is not clear whether it is a
> problem with my code or with my spark setup. I am using Spark 1.1,
> standalone, and my other spark projects work fine.
>
> So I have to repartition a relatively large file (about 70 million lines).
> Here is a minimal version of what is not working fine:
>
> myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
>
> myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
>
> myRepartitionedRDD.saveAsTextFile(...)
>
> It runs quite some time, until I get some errors and it retries. Errors
> are:
>
> FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
> shuffleId=1,mapId=1,reduceId=5)
>
> Logs are not much more infomrative. I get:
>
> Java.io.IOException : sendMessageReliability failed because ack was not
> received within 60 sec
>
>
>
> I get similar errors with all my workers.
>
> Do you have some kind of explanation for this behaviour? What could be
> wrong?
>
> Thanks,
>
>
>
>
>

RE: Issues with partitionBy: FetchFailed

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi,

I’ve also met this problem before, I think you can try to set “spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, default is 60 seconds.

Sometimes because of GC pause or some other reasons, acknowledged message will be timeout, which will lead to this exception, you can try setting a large value of this configuration.

Thanks
Jerry

From: Julien Carme [mailto:julien.carme@gmail.com]
Sent: Sunday, September 21, 2014 7:43 PM
To: user@spark.apache.org
Subject: Issues with partitionBy: FetchFailed

Hello,
I am facing an issue with partitionBy, it is not clear whether it is a problem with my code or with my spark setup. I am using Spark 1.1, standalone, and my other spark projects work fine.
So I have to repartition a relatively large file (about 70 million lines). Here is a minimal version of what is not working fine:
myRDD = sc.textFile("...").map { line => (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)
It runs quite some time, until I get some errors and it retries. Errors are:
FetchFailed(BlockManagerId(3,myWorker2, 52082,0), shuffleId=1,mapId=1,reduceId=5)
Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not received within 60 sec

I get similar errors with all my workers.
Do you have some kind of explanation for this behaviour? What could be wrong?
Thanks,