You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arun Ahuja <aa...@gmail.com> on 2014/09/04 17:50:44 UTC

Any issues with repartition?

I wanted to know if there were any known issues with repartition?

I have two RDD's loaded in my code, both are loaded into 133 partitions.
 Before processing I want to repartition both to more (let's say 1000) and
then just count the records.  This task seems to be failing for me (Spark
1.0)

Code:
[image: Inline image 1]

Spark UI:


[image: Inline image 2]

My questions are 1) Why are there for repartitions stages for two calls? 2)
Why are there more tasks than listed in those stages but none listed as
failed?


I see tons of exceptions in the logs, but none seem particularly helpful:

14/09/04 11:34:24 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@646de5cc
java.nio.channels.CancelledKeyException
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:287)

java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:127)

java.net.ConnectException: Connection timed out
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)

java.nio.channels.NoConnectionPendingException
        at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:720)

I can run other spark jobs to completion, so it does appear to be a true
network issue.

Thanks,
Arun

Re: Any issues with repartition?

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
After a bit of research, i figured out that the one of the worker was hung
on cleaning up GC and the connection usually times out since the default is
60Seconds, so i set it to a higher number and it eliminated this issue. You
may want to try this:

sc.set("spark.core.connection.ack.wait.timeout","600")
sc.set("spark.akka.frameSize","50")


Thanks
Best Regards

On Wed, Oct 8, 2014 at 6:06 PM, jamborta <ja...@gmail.com> wrote:

> I am still puzzled on this. In my case the data is allowed to write to
> disk,
> and I usually get different errors if it is out of memory.
>
> My guess is that akka kills the executors for some reason.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15929.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Any issues with repartition?

Posted by jamborta <ja...@gmail.com>.
I am still puzzled on this. In my case the data is allowed to write to disk,
and I usually get different errors if it is out of memory. 

My guess is that akka kills the executors for some reason.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15929.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Any issues with repartition?

Posted by "Johnson, Dale" <da...@ebay.com>.
I would second the suggest that one of the spark committers weigh in.

Many times the repartition() command fails, no matter how many times I run
it.  

This is more of an 0.x behavior than a 1.0.2 behavior.

anyone?
Dale.




On 10/8/14, 1:06 AM, "Paul Wais" <pw...@yelp.com> wrote:

>Looks like an OOM issue?  Have you tried persisting your RDDs to allow
>disk writes?
>
>I've seen a lot of similar crashes in a Spark app that reads from HDFS
>and does joins.  I.e. I've seen "java.io.IOException: Filesystem
>closed," "Executor lost," "FetchFailed," etc etc with
>non-deterministic crashes.  I've tried persisting RDDs, tuning other
>params, and verifying that the Executor JVMs don't come close to their
>max allocated memory during operation.
>
>Looking through user@ tonight, there are a ton of email threads with
>similar crashes and no answers.  It looks like a lot of people are
>struggling with OOMs.
>
>Could one of the Spark committers please comment on this thread, or
>one of the other unanswered threads with similar crashes?  Is this
>simply how Spark behaves if Executors OOM?  What can the user do other
>than increase memory or reduce RDD size?  (And how can one deduce how
>much of either is needed?)
>
>One general workaround for OOMs could be to programmatically break the
>job input (i.e. from HDFS, input from #parallelize() ) into chunks,
>and only create/process RDDs related to one chunk at a time.  However,
>this approach has the limitations of Spark Streaming and no formal
>library support.  What might be nice is that if tasks fail, Spark
>could try to re-partition in order to avoid OOMs.
>
>
>
>On Fri, Oct 3, 2014 at 2:55 AM, jamborta <ja...@gmail.com> wrote:
>> I have two nodes with 96G ram 16 cores, my setup is as follows:
>>
>>     conf = (SparkConf()
>>             .setMaster("yarn-cluster")
>>             .set("spark.executor.memory", "30G")
>>             .set("spark.cores.max", 32)
>>             .set("spark.executor.instances", 2)
>>             .set("spark.executor.cores", 8)
>>             .set("spark.akka.timeout", 10000)
>>             .set("spark.akka.askTimeout", 100)
>>             .set("spark.akka.frameSize", 500)
>>             .set("spark.cleaner.ttl", 86400)
>>             .set("spark.tast.maxFailures", 16)
>>             .set("spark.worker.timeout", 150)
>>
>> thanks a lot,
>>
>>
>>
>>
>> --
>> View this message in context:
>>http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repar
>>tition-tp13462p15674.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>For additional commands, e-mail: user-help@spark.apache.org
>


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Any issues with repartition?

Posted by Paul Wais <pw...@yelp.com>.
Looks like an OOM issue?  Have you tried persisting your RDDs to allow
disk writes?

I've seen a lot of similar crashes in a Spark app that reads from HDFS
and does joins.  I.e. I've seen "java.io.IOException: Filesystem
closed," "Executor lost," "FetchFailed," etc etc with
non-deterministic crashes.  I've tried persisting RDDs, tuning other
params, and verifying that the Executor JVMs don't come close to their
max allocated memory during operation.

Looking through user@ tonight, there are a ton of email threads with
similar crashes and no answers.  It looks like a lot of people are
struggling with OOMs.

Could one of the Spark committers please comment on this thread, or
one of the other unanswered threads with similar crashes?  Is this
simply how Spark behaves if Executors OOM?  What can the user do other
than increase memory or reduce RDD size?  (And how can one deduce how
much of either is needed?)

One general workaround for OOMs could be to programmatically break the
job input (i.e. from HDFS, input from #parallelize() ) into chunks,
and only create/process RDDs related to one chunk at a time.  However,
this approach has the limitations of Spark Streaming and no formal
library support.  What might be nice is that if tasks fail, Spark
could try to re-partition in order to avoid OOMs.



On Fri, Oct 3, 2014 at 2:55 AM, jamborta <ja...@gmail.com> wrote:
> I have two nodes with 96G ram 16 cores, my setup is as follows:
>
>     conf = (SparkConf()
>             .setMaster("yarn-cluster")
>             .set("spark.executor.memory", "30G")
>             .set("spark.cores.max", 32)
>             .set("spark.executor.instances", 2)
>             .set("spark.executor.cores", 8)
>             .set("spark.akka.timeout", 10000)
>             .set("spark.akka.askTimeout", 100)
>             .set("spark.akka.frameSize", 500)
>             .set("spark.cleaner.ttl", 86400)
>             .set("spark.tast.maxFailures", 16)
>             .set("spark.worker.timeout", 150)
>
> thanks a lot,
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15674.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Any issues with repartition?

Posted by jamborta <ja...@gmail.com>.
I have two nodes with 96G ram 16 cores, my setup is as follows:

    conf = (SparkConf()
            .setMaster("yarn-cluster")
            .set("spark.executor.memory", "30G")
            .set("spark.cores.max", 32)
            .set("spark.executor.instances", 2)
            .set("spark.executor.cores", 8)
            .set("spark.akka.timeout", 10000)
            .set("spark.akka.askTimeout", 100)
            .set("spark.akka.frameSize", 500)
            .set("spark.cleaner.ttl", 86400)
            .set("spark.tast.maxFailures", 16)
            .set("spark.worker.timeout", 150)

thanks a lot,




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15674.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Any issues with repartition?

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
What is your cluster setup? and how much memory are you allocating to the
executor?

Thanks
Best Regards

On Fri, Oct 3, 2014 at 7:52 AM, jamborta <ja...@gmail.com> wrote:

> Hi Arun,
>
> Have you found a solution? Seems that I have the same problem.
>
> thanks,
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15654.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Any issues with repartition?

Posted by jamborta <ja...@gmail.com>.
Hi Arun,

Have you found a solution? Seems that I have the same problem.

thanks,



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15654.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Any issues with repartition?

Posted by Arun Ahuja <aa...@gmail.com>.
Upgrading to Spark 1.1 does not seem to resolve this issue.  Examples of
exceptions in the log include:

java.io.IOException: sendMessageReliably failed without being ACK'd
        at
org.apache.spark.network.Connection.callOnExceptionCallback(Connection.scala:147)

14/09/08 15:26:09 ERROR SendingConnection: Exception while reading
SendingConnection to ConnectionManagerId
java.nio.channels.ClosedChannelException



On Mon, Sep 8, 2014 at 11:27 AM, Arun Ahuja <aa...@gmail.com> wrote:

> Haven't found a way to get past this yet, any ideas?
>
> Also any ideas what "FetchFailed(null, shuffleId=0, mapId=-1,
> reduceId=65)" would mean if I see in that in the UI, thats what in the task
> failure message in SparkUI
>
> On Thu, Sep 4, 2014 at 3:06 PM, Arun Ahuja <aa...@gmail.com> wrote:
>
>> Also just to provide some more information, out of about 7 runs 2 did
>> work sucessfully - not sure why though?
>>
>> Also there are also exceptions related to
>>
>> 14/09/04 15:02:18 WARN NewHadoopRDD: Exception in RecordReader.close()
>> java.io.IOException: Filesystem closed
>>         at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
>>         at
>> org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:617)
>>         at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>         at org.apache.hadoop.util.LineReader.close(LineReader.java:150)
>>         at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:193)
>>         at org.apache.spark.rdd.NewHadoopRDD$$anon$1.org
>> $apache$spark$rdd$NewHadoopRDD$$anon$$close(NewHadoopRDD.scala:138)
>>
>>
>> Perhaps that is just this issue:
>> https://issues.apache.org/jira/browse/SPARK-3052
>>
>> Also in the end it says:
>>
>> 14/09/04 15:02:09 WARN TaskSetManager: Loss was due to fetch failure from
>> BlockManagerId
>>
>> And I see this exception:
>>
>> 14/09/04 15:02:01 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
>> Could not get block(s) from ConnectionManagerId
>>
>>
>>
>> On Thu, Sep 4, 2014 at 2:59 PM, Arun Ahuja <aa...@gmail.com> wrote:
>>
>>> I simplified my program dramtically, but still see issues:
>>>
>>> Code:
>>>
>>> [image: Inline image 2]
>>>
>>> Spark UI:
>>> [image: Inline image 1]
>>>
>>> The first *count* operation does print, but the one after the
>>> repartition does not.  There are 138,305,035 records.
>>>
>>>
>>> From just those four lines of code, I see the following exceptions:
>>>
>>>
>>> java.io.IOException: Broken pipe
>>>         at sun.nio.ch.FileDispatcherImpl.$$YJP$$write0(Native Method)
>>>         at sun.nio.ch.FileDispatcherImpl.write0(FileDispatcherImpl.java)
>>>
>>>
>>> 14/09/04 14:54:19 INFO ConnectionManager: key already cancelled ?
>>> sun.nio.ch.SelectionKeyImpl@70fa0be9
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363)
>>>
>>> 14/09/04 14:54:44 ERROR ExecutorUncaughtExceptionHandler: Uncaught
>>> exception in thread Thread[pool-1-thread-1,5,main]
>>> java.lang.Error: java.lang.Exception: Could not find reference for
>>> received ack message 4215
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
>>>
>>>
>>>
>>>
>>> On Thu, Sep 4, 2014 at 11:50 AM, Arun Ahuja <aa...@gmail.com> wrote:
>>>
>>>> I wanted to know if there were any known issues with repartition?
>>>>
>>>> I have two RDD's loaded in my code, both are loaded into 133
>>>> partitions.  Before processing I want to repartition both to more (let's
>>>> say 1000) and then just count the records.  This task seems to be failing
>>>> for me (Spark 1.0)
>>>>
>>>> Code:
>>>> [image: Inline image 1]
>>>>
>>>> Spark UI:
>>>>
>>>>
>>>> [image: Inline image 2]
>>>>
>>>> My questions are 1) Why are there for repartitions stages for two
>>>> calls? 2) Why are there more tasks than listed in those stages but none
>>>> listed as failed?
>>>>
>>>>
>>>> I see tons of exceptions in the logs, but none seem particularly
>>>> helpful:
>>>>
>>>> 14/09/04 11:34:24 INFO ConnectionManager: key already cancelled ?
>>>> sun.nio.ch.SelectionKeyImpl@646de5cc
>>>> java.nio.channels.CancelledKeyException
>>>>         at
>>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:287)
>>>>
>>>> java.nio.channels.UnresolvedAddressException
>>>>         at sun.nio.ch.Net.checkAddress(Net.java:127)
>>>>
>>>> java.net.ConnectException: Connection timed out
>>>>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>         at
>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>>>>
>>>> java.nio.channels.NoConnectionPendingException
>>>>         at
>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:720)
>>>>
>>>> I can run other spark jobs to completion, so it does appear to be a
>>>> true network issue.
>>>>
>>>> Thanks,
>>>> Arun
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Any issues with repartition?

Posted by Arun Ahuja <aa...@gmail.com>.
Haven't found a way to get past this yet, any ideas?

Also any ideas what "FetchFailed(null, shuffleId=0, mapId=-1, reduceId=65)"
would mean if I see in that in the UI, thats what in the task failure
message in SparkUI

On Thu, Sep 4, 2014 at 3:06 PM, Arun Ahuja <aa...@gmail.com> wrote:

> Also just to provide some more information, out of about 7 runs 2 did work
> sucessfully - not sure why though?
>
> Also there are also exceptions related to
>
> 14/09/04 15:02:18 WARN NewHadoopRDD: Exception in RecordReader.close()
> java.io.IOException: Filesystem closed
>         at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
>         at
> org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:617)
>         at java.io.FilterInputStream.close(FilterInputStream.java:181)
>         at org.apache.hadoop.util.LineReader.close(LineReader.java:150)
>         at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:193)
>         at org.apache.spark.rdd.NewHadoopRDD$$anon$1.org
> $apache$spark$rdd$NewHadoopRDD$$anon$$close(NewHadoopRDD.scala:138)
>
>
> Perhaps that is just this issue:
> https://issues.apache.org/jira/browse/SPARK-3052
>
> Also in the end it says:
>
> 14/09/04 15:02:09 WARN TaskSetManager: Loss was due to fetch failure from
> BlockManagerId
>
> And I see this exception:
>
> 14/09/04 15:02:01 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
> Could not get block(s) from ConnectionManagerId
>
>
>
> On Thu, Sep 4, 2014 at 2:59 PM, Arun Ahuja <aa...@gmail.com> wrote:
>
>> I simplified my program dramtically, but still see issues:
>>
>> Code:
>>
>> [image: Inline image 2]
>>
>> Spark UI:
>> [image: Inline image 1]
>>
>> The first *count* operation does print, but the one after the
>> repartition does not.  There are 138,305,035 records.
>>
>>
>> From just those four lines of code, I see the following exceptions:
>>
>>
>> java.io.IOException: Broken pipe
>>         at sun.nio.ch.FileDispatcherImpl.$$YJP$$write0(Native Method)
>>         at sun.nio.ch.FileDispatcherImpl.write0(FileDispatcherImpl.java)
>>
>>
>> 14/09/04 14:54:19 INFO ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@70fa0be9
>> java.nio.channels.CancelledKeyException
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363)
>>
>> 14/09/04 14:54:44 ERROR ExecutorUncaughtExceptionHandler: Uncaught
>> exception in thread Thread[pool-1-thread-1,5,main]
>> java.lang.Error: java.lang.Exception: Could not find reference for
>> received ack message 4215
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
>>
>>
>>
>>
>> On Thu, Sep 4, 2014 at 11:50 AM, Arun Ahuja <aa...@gmail.com> wrote:
>>
>>> I wanted to know if there were any known issues with repartition?
>>>
>>> I have two RDD's loaded in my code, both are loaded into 133 partitions.
>>>  Before processing I want to repartition both to more (let's say 1000) and
>>> then just count the records.  This task seems to be failing for me (Spark
>>> 1.0)
>>>
>>> Code:
>>> [image: Inline image 1]
>>>
>>> Spark UI:
>>>
>>>
>>> [image: Inline image 2]
>>>
>>> My questions are 1) Why are there for repartitions stages for two calls?
>>> 2) Why are there more tasks than listed in those stages but none listed as
>>> failed?
>>>
>>>
>>> I see tons of exceptions in the logs, but none seem particularly helpful:
>>>
>>> 14/09/04 11:34:24 INFO ConnectionManager: key already cancelled ?
>>> sun.nio.ch.SelectionKeyImpl@646de5cc
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:287)
>>>
>>> java.nio.channels.UnresolvedAddressException
>>>         at sun.nio.ch.Net.checkAddress(Net.java:127)
>>>
>>> java.net.ConnectException: Connection timed out
>>>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>         at
>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>>>
>>> java.nio.channels.NoConnectionPendingException
>>>         at
>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:720)
>>>
>>> I can run other spark jobs to completion, so it does appear to be a true
>>> network issue.
>>>
>>> Thanks,
>>> Arun
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Any issues with repartition?

Posted by Arun Ahuja <aa...@gmail.com>.
Also just to provide some more information, out of about 7 runs 2 did work
sucessfully - not sure why though?

Also there are also exceptions related to

14/09/04 15:02:18 WARN NewHadoopRDD: Exception in RecordReader.close()
java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
        at
org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:617)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at org.apache.hadoop.util.LineReader.close(LineReader.java:150)
        at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:193)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.org
$apache$spark$rdd$NewHadoopRDD$$anon$$close(NewHadoopRDD.scala:138)


Perhaps that is just this issue:
https://issues.apache.org/jira/browse/SPARK-3052

Also in the end it says:

14/09/04 15:02:09 WARN TaskSetManager: Loss was due to fetch failure from
BlockManagerId

And I see this exception:

14/09/04 15:02:01 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
Could not get block(s) from ConnectionManagerId



On Thu, Sep 4, 2014 at 2:59 PM, Arun Ahuja <aa...@gmail.com> wrote:

> I simplified my program dramtically, but still see issues:
>
> Code:
>
> [image: Inline image 2]
>
> Spark UI:
> [image: Inline image 1]
>
> The first *count* operation does print, but the one after the repartition
> does not.  There are 138,305,035 records.
>
>
> From just those four lines of code, I see the following exceptions:
>
>
> java.io.IOException: Broken pipe
>         at sun.nio.ch.FileDispatcherImpl.$$YJP$$write0(Native Method)
>         at sun.nio.ch.FileDispatcherImpl.write0(FileDispatcherImpl.java)
>
>
> 14/09/04 14:54:19 INFO ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@70fa0be9
> java.nio.channels.CancelledKeyException
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363)
>
> 14/09/04 14:54:44 ERROR ExecutorUncaughtExceptionHandler: Uncaught
> exception in thread Thread[pool-1-thread-1,5,main]
> java.lang.Error: java.lang.Exception: Could not find reference for
> received ack message 4215
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
>
>
>
>
> On Thu, Sep 4, 2014 at 11:50 AM, Arun Ahuja <aa...@gmail.com> wrote:
>
>> I wanted to know if there were any known issues with repartition?
>>
>> I have two RDD's loaded in my code, both are loaded into 133 partitions.
>>  Before processing I want to repartition both to more (let's say 1000) and
>> then just count the records.  This task seems to be failing for me (Spark
>> 1.0)
>>
>> Code:
>> [image: Inline image 1]
>>
>> Spark UI:
>>
>>
>> [image: Inline image 2]
>>
>> My questions are 1) Why are there for repartitions stages for two calls?
>> 2) Why are there more tasks than listed in those stages but none listed as
>> failed?
>>
>>
>> I see tons of exceptions in the logs, but none seem particularly helpful:
>>
>> 14/09/04 11:34:24 INFO ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@646de5cc
>> java.nio.channels.CancelledKeyException
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:287)
>>
>> java.nio.channels.UnresolvedAddressException
>>         at sun.nio.ch.Net.checkAddress(Net.java:127)
>>
>> java.net.ConnectException: Connection timed out
>>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>>
>> java.nio.channels.NoConnectionPendingException
>>         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:720)
>>
>> I can run other spark jobs to completion, so it does appear to be a true
>> network issue.
>>
>> Thanks,
>> Arun
>>
>>
>>
>>
>>
>

Re: Any issues with repartition?

Posted by Arun Ahuja <aa...@gmail.com>.
I simplified my program dramtically, but still see issues:

Code:

[image: Inline image 2]

Spark UI:
[image: Inline image 1]

The first *count* operation does print, but the one after the repartition
does not.  There are 138,305,035 records.


>From just those four lines of code, I see the following exceptions:


java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.$$YJP$$write0(Native Method)
        at sun.nio.ch.FileDispatcherImpl.write0(FileDispatcherImpl.java)


14/09/04 14:54:19 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@70fa0be9
java.nio.channels.CancelledKeyException
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363)

14/09/04 14:54:44 ERROR ExecutorUncaughtExceptionHandler: Uncaught
exception in thread Thread[pool-1-thread-1,5,main]
java.lang.Error: java.lang.Exception: Could not find reference for received
ack message 4215
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)




On Thu, Sep 4, 2014 at 11:50 AM, Arun Ahuja <aa...@gmail.com> wrote:

> I wanted to know if there were any known issues with repartition?
>
> I have two RDD's loaded in my code, both are loaded into 133 partitions.
>  Before processing I want to repartition both to more (let's say 1000) and
> then just count the records.  This task seems to be failing for me (Spark
> 1.0)
>
> Code:
> [image: Inline image 1]
>
> Spark UI:
>
>
> [image: Inline image 2]
>
> My questions are 1) Why are there for repartitions stages for two calls?
> 2) Why are there more tasks than listed in those stages but none listed as
> failed?
>
>
> I see tons of exceptions in the logs, but none seem particularly helpful:
>
> 14/09/04 11:34:24 INFO ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@646de5cc
> java.nio.channels.CancelledKeyException
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:287)
>
> java.nio.channels.UnresolvedAddressException
>         at sun.nio.ch.Net.checkAddress(Net.java:127)
>
> java.net.ConnectException: Connection timed out
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>
> java.nio.channels.NoConnectionPendingException
>         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:720)
>
> I can run other spark jobs to completion, so it does appear to be a true
> network issue.
>
> Thanks,
> Arun
>
>
>
>
>