You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ravi Pandya <ra...@iecommerce.com> on 2014/07/14 22:09:12 UTC

Memory & compute-intensive tasks

I'm trying to run a job that includes an invocation of a memory &
compute-intensive multithreaded C++ program, and so I'd like to run one
task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
one task per core, and so runs out of memory on the node. Is there any way
to give the scheduler a hint that the task uses lots of memory and cores so
it spreads it out more evenly?

Thanks,

Ravi Pandya
Microsoft Research

Re: Memory & compute-intensive tasks

Posted by rpandya <ra...@iecommerce.com>.
This one turned out to be another problem with my app configuration, not with
Spark. The compute task was dependent on the local filesystem, and config
errors on 8 of 10 of the nodes made them fail early. The Spark wrapper was
not checking the process exit value, so it appeared as if they were
producing only a little data.

Ravi



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p11375.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Memory & compute-intensive tasks

Posted by Matei Zaharia <ma...@gmail.com>.
Is data being cached? It might be that those two nodes started first and did the first pass of the data, so it's all on them. It's kind of ugly but you can add a Thread.sleep when your program starts to wait for nodes to come up.

Also, have you checked the applicatio web UI at http://<driver node>:4040 while the app is running? It shows details of where each task and where each partition of data is, which might show that e.g. some tasks are much longer than others due to data skew, or stuff like that.

Matei

On July 29, 2014 at 10:13:14 AM, rpandya (ravi@iecommerce.com) wrote:

OK, I did figure this out. I was running the app (avocado) using 
spark-submit, when it was actually designed to take command line arguments 
to connect to a spark cluster. Since I didn't provide any such arguments, it 
started a nested local Spark cluster *inside* the YARN Spark executor and so 
of course everything ran on one node. If I spin up a Spark cluster manually 
and provide the spark master URI to avocado, it works fine. 

Now, I've tried running a reasonable-sized job through (400GB of data on 10 
HDFS/Spark nodes), and the partitioning is strange. Eight nodes get almost 
nothing, and the other two nodes each get half the work. This happens 
whether I use coalesce with shuffle=true or false before the work stage. 
(Though if I use shuffle=true, it creates 3000 tasks to do the shuffle, and 
still ends up with this skewed distribution!) Any suggestions on how to 
figure out what's going on? 

Thanks, 

Ravi 



-- 
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10868.html 
Sent from the Apache Spark User List mailing list archive at Nabble.com. 

Re: Memory & compute-intensive tasks

Posted by rpandya <ra...@iecommerce.com>.
OK, I did figure this out. I was running the app (avocado) using
spark-submit, when it was actually designed to take command line arguments
to connect to a spark cluster. Since I didn't provide any such arguments, it
started a nested local Spark cluster *inside* the YARN Spark executor and so
of course everything ran on one node. If I spin up a Spark cluster manually
and provide the spark master URI to avocado, it works fine.

Now, I've tried running a reasonable-sized job through (400GB of data on 10
HDFS/Spark nodes), and the partitioning is strange. Eight nodes get almost
nothing, and the other two nodes each get half the work. This happens
whether I use coalesce with shuffle=true or false before the work stage.
(Though if I use shuffle=true, it creates 3000 tasks to do the shuffle, and
still ends up with this skewed distribution!) Any suggestions on how to
figure out what's going on?

Thanks,

Ravi



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Memory & compute-intensive tasks

Posted by rpandya <ra...@iecommerce.com>.
I also tried increasing --num-executors to numNodes * coresPerNode and using
coalesce(numNodes*10,true), and it still ran all the tasks on one node. It
seems like it is placing all the executors on one node (though not always
the same node, which indicates it is aware of more than one!). I'm using
spark-submit --master yarn --deploy-mode cluster with spark-1.0.1 built for
hadoop 2.4 on HDP 2.1/Hadoop 2.4.

There's clearly just something wrong with my Hadoop configuration, or in how
I'm submitting my spark job - any suggestions?

Thanks,

Ravi



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10209.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Memory & compute-intensive tasks

Posted by rpandya <ra...@iecommerce.com>.
Hi Matei-

Changing to coalesce(numNodes, true) still runs all partitions on a single
node, which I verified by printing the hostname before I exec the external
process.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Memory & compute-intensive tasks

Posted by Liquan Pei <li...@gmail.com>.
Hi Ravi,

I have seen a similar issue before. You can try to set
fs.hdfs.impl.disable.cache to true in your hadoop configuration. For
example, suppose your hadoop configuration file is hadoopConf, you can use
hadoopConf.setBoolean("fs.hdfs.impl.disable.cache", true)

Let me know if that helps.

Best,
Liquan


On Wed, Jul 16, 2014 at 4:56 PM, rpandya <ra...@iecommerce.com> wrote:

> Matei - I tried using coalesce(numNodes, true), but it then seemed to run
> too
> few SNAP tasks - only 2 or 3 when I had specified 46. The job failed,
> perhaps for unrelated reasons, with some odd exceptions in the log (at the
> end of this message). But I really don't want to force data movement
> between
> nodes. The input data is in HDFS and should already be somewhat balanced
> among the nodes. We've run this scenario using the simple "hadoop jar"
> runner and a custom format jar to break the input into 8-line chunks
> (paired
> FASTQ). Ideally I'd like Spark to do the minimum data movement to balance
> the work, feeding each task mostly from data local to that node.
>
> Daniel - that's a good thought, I could invoke a small stub for each task
> that talks to a single local demon process over a socket, and serializes
> all
> the tasks on a given machine.
>
> Thanks,
>
> Ravi
>
> P.S. Log exceptions:
>
> 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve
> SparkContext in spite of waiting for 100000, maxNumTries = 10
> Exception in thread "main" java.lang.NullPointerException
>         at
>
> org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233)
>         at
>
> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110)
>
> ...and later...
>
> 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
> 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal.
> 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close()
> java.io.IOException: Filesystem closed
>         at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
>         at
> org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst

Re: Memory & compute-intensive tasks

Posted by rpandya <ra...@iecommerce.com>.
Matei - I tried using coalesce(numNodes, true), but it then seemed to run too
few SNAP tasks - only 2 or 3 when I had specified 46. The job failed,
perhaps for unrelated reasons, with some odd exceptions in the log (at the
end of this message). But I really don't want to force data movement between
nodes. The input data is in HDFS and should already be somewhat balanced
among the nodes. We've run this scenario using the simple "hadoop jar"
runner and a custom format jar to break the input into 8-line chunks (paired
FASTQ). Ideally I'd like Spark to do the minimum data movement to balance
the work, feeding each task mostly from data local to that node.

Daniel - that's a good thought, I could invoke a small stub for each task
that talks to a single local demon process over a socket, and serializes all
the tasks on a given machine.

Thanks,

Ravi

P.S. Log exceptions:

14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve
SparkContext in spite of waiting for 100000, maxNumTries = 10
Exception in thread "main" java.lang.NullPointerException
	at
org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233)
	at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110)

...and later...

14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal.
14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close()
java.io.IOException: Filesystem closed
	at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
	at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619)




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Memory & compute-intensive tasks

Posted by Daniel Siegmann <da...@velos.io>.
Depending on how your C++ program is designed, maybe you can feed the data
from multiple partitions into the same process? Getting the results back
might be tricky. But that may be the only way to guarantee you're only
using one invocation per node.


On Mon, Jul 14, 2014 at 5:12 PM, Matei Zaharia <ma...@gmail.com>
wrote:

> I think coalesce with shuffle=true will force it to have one task per
> node. Without that, it might be that due to data locality it decides to
> launch multiple ones on the same node even though the total # of tasks is
> equal to the # of nodes.
>
> If this is the *only* thing you run on the cluster, you could also
> configure the Workers to only report one core by manually launching the
> spark.deploy.worker.Worker process with that flag (see
> http://spark.apache.org/docs/latest/spark-standalone.html).
>
> Matei
>
> On Jul 14, 2014, at 1:59 PM, Daniel Siegmann <da...@velos.io>
> wrote:
>
> I don't have a solution for you (sorry), but do note that
> rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
> set shuffle=true then it should repartition and redistribute the data.
> But it uses the hash partitioner according to the ScalaDoc - I don't know
> of any way to supply a custom partitioner.
>
>
> On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya <ra...@iecommerce.com> wrote:
>
>> I'm trying to run a job that includes an invocation of a memory &
>> compute-intensive multithreaded C++ program, and so I'd like to run one
>> task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
>> one task per core, and so runs out of memory on the node. Is there any way
>> to give the scheduler a hint that the task uses lots of memory and cores so
>> it spreads it out more evenly?
>>
>> Thanks,
>>
>> Ravi Pandya
>> Microsoft Research
>>
>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegmann@velos.io W: www.velos.io
>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegmann@velos.io W: www.velos.io

Re: Memory & compute-intensive tasks

Posted by Matei Zaharia <ma...@gmail.com>.
I think coalesce with shuffle=true will force it to have one task per node. Without that, it might be that due to data locality it decides to launch multiple ones on the same node even though the total # of tasks is equal to the # of nodes.

If this is the *only* thing you run on the cluster, you could also configure the Workers to only report one core by manually launching the spark.deploy.worker.Worker process with that flag (see http://spark.apache.org/docs/latest/spark-standalone.html).

Matei

On Jul 14, 2014, at 1:59 PM, Daniel Siegmann <da...@velos.io> wrote:

> I don't have a solution for you (sorry), but do note that rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set shuffle=true then it should repartition and redistribute the data. But it uses the hash partitioner according to the ScalaDoc - I don't know of any way to supply a custom partitioner.
> 
> 
> On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya <ra...@iecommerce.com> wrote:
> I'm trying to run a job that includes an invocation of a memory & compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly?
> 
> Thanks,
> 
> Ravi Pandya
> Microsoft Research
> 
> 
> 
> -- 
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
> 
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegmann@velos.io W: www.velos.io


Re: Memory & compute-intensive tasks

Posted by Daniel Siegmann <da...@velos.io>.
I don't have a solution for you (sorry), but do note that
rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
set shuffle=true then it should repartition and redistribute the data. But
it uses the hash partitioner according to the ScalaDoc - I don't know of
any way to supply a custom partitioner.


On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya <ra...@iecommerce.com> wrote:

> I'm trying to run a job that includes an invocation of a memory &
> compute-intensive multithreaded C++ program, and so I'd like to run one
> task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
> one task per core, and so runs out of memory on the node. Is there any way
> to give the scheduler a hint that the task uses lots of memory and cores so
> it spreads it out more evenly?
>
> Thanks,
>
> Ravi Pandya
> Microsoft Research
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegmann@velos.io W: www.velos.io