You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andras Nemeth <an...@lynxanalytics.com> on 2014/04/10 16:11:59 UTC

Fwd: Spark - ready for prime time?

Hello Spark Users,

With the recent graduation of Spark to a top level project (grats, btw!),
maybe a well timed question. :)

We are at the very beginning of a large scale big data project and after
two months of exploration work we'd like to settle on the technologies to
use, roll up our sleeves and start to build the system.

Spark is one of the forerunners for our technology choice.

My question in essence is whether it's a good idea or is Spark too
'experimental' just yet to bet our lives (well, the project's life) on it.

The benefits of choosing Spark are numerous and I guess all too obvious for
this audience - e.g. we love its powerful abstraction, ease of development
and the potential for using a single system for serving and manipulating
huge amount of data.

This email aims to ask about the risks. I enlist concrete issues we've
encountered below, but basically my concern boils down to two philosophical
points:
I. Is it too much magic? Lots of things "just work right" in Spark and it's
extremely convenient and efficient when it indeed works. But should we be
worried that customization is hard if the built in behavior is not quite
right for us? Are we to expect hard to track down issues originating from
the black box behind the magic?
II. Is it mature enough? E.g. we've created a pull
request<https://github.com/apache/spark/pull/181>which fixes a problem
that we were very surprised no one ever stumbled upon
before. So that's why I'm asking: is Spark being already used in
professional settings? Can one already trust it being reasonably bug free
and reliable?

I know I'm asking a biased audience, but that's fine, as I want to be
convinced. :)

So, to the concrete issues. Sorry for the long mail, and let me know if I
should break this out into more threads or if there is some other way to
have this discussion...

1. Memory management
The general direction of these questions is whether it's possible to take
RDD caching related memory management more into our own hands as LRU
eviction is nice most of the time but can be very suboptimal in some of our
use cases.
A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
really wants to keep. I'm fine with going down in flames if I mark too much
data essential.
B. Memory "reflection": can you pragmatically get the memory size of a
cached rdd and memory sizes available in total/per executor? If we could do
this we could indirectly avoid automatic evictions of things we might
really want to keep in memory.
C. Evictions caused by RDD partitions on the driver. I had a setup with
huge worker memory and smallish memory on the driver JVM. To my surprise,
the system started to cache RDD partitions on the driver as well. As the
driver ran out of memory I started to see evictions while there were still
plenty of space on workers. This resulted in lengthy recomputations. Can
this be avoided somehow?
D. Broadcasts. Is it possible to get rid of a broadcast manually, without
waiting for the LRU eviction taking care of it? Can you tell the size of a
broadcast programmatically?


2. Akka lost connections
We have quite often experienced lost executors due to akka exceptions -
mostly connection lost or similar. It seems to happen when an executor gets
extremely busy with some CPU intensive work. Our hypothesis is that akka
network threads get starved and the executor fails to respond within
timeout limits. Is this plausible? If yes, what can we do with it?

In general, these are scary errors in the sense that they come from the
very core of the framework and it's hard to link it to something we do in
our own code, and thus hard to find a fix. So a question more for the
community: how often do you end up scratching your head about cases where
spark magic doesn't work perfectly?


3. Recalculation of cached rdds
I see the following scenario happening. I load two RDDs A,B from disk,
cache them and then do some jobs on them, at the very least a count on
each. After these jobs are done I see on the storage panel that 100% of
these RDDs are cached in memory.

Then I create a third RDD C which is created by multiple joins and maps
from A and B, also cache it and start a job on C. When I do this I still
see A and B completely cached and also see C slowly getting more and more
cached. This is all fine and good, but in the meanwhile I see stages
running on the UI that point to code which is used to load A and B. How is
this possible? Am I misunderstanding how cached RDDs should behave?

And again the general question - how can one debug such issues?

4. Shuffle on disk
Is it true - I couldn't find it in official docs, but did see this
mentioned in various threads - that shuffle _always_ hits disk?
(Disregarding OS caches.) Why is this the case? Are you planning to add a
function to do shuffle in memory or are there some intrinsic reasons for
this to be impossible?


Sorry again for the giant mail, and thanks for any insights!

Andras

Re: Spark - ready for prime time?

Posted by Brad Miller <bm...@eecs.berkeley.edu>.
I would echo much of what Andrew has said.

I manage a small/medium sized cluster (48 cores, 512G ram, 512G disk
space dedicated to spark, data storage in separate HDFS shares).  I've
been using spark since 0.7, and as with Andrew I've observed
significant and consistent improvements in stability (and in the
PySpark API) since then.  I have run into some trouble with mesos, and
I have run into some trouble when working with data which is large
relative to the size of my cluster (e.g. 100G), but overall it's
worked well and our group is continuing to build on top of spark.

On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable
> when under memory pressure.  Meaning that if you attempt to persist an RDD
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get
> OOMs.  I had to carefully modify some of the space tuning parameters and GC
> settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>
> wrote:
>>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth
>> <an...@lynxanalytics.com> wrote:
>>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>>
>>> II. Is it mature enough? E.g. we've created a pull request which fixes a
>>> problem that we were very surprised no one ever stumbled upon before. So
>>> that's why I'm asking: is Spark being already used in professional settings?
>>> Can one already trust it being reasonably bug free and reliable?
>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy it
>> performs better than Hadoop but we don't require/rely on its memory caching
>> features.  In fact, for most of our jobs it would simplify our lives if
>> Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to take
>>> RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might really
>>> want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>> the system started to cache RDD partitions on the driver as well. As the
>>> driver ran out of memory I started to see evictions while there were still
>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>> this be avoided somehow?
>>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>>> broadcast programmatically?
>>>
>>>
>>> 2. Akka lost connections
>>> We have quite often experienced lost executors due to akka exceptions -
>>> mostly connection lost or similar. It seems to happen when an executor gets
>>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>>> network threads get starved and the executor fails to respond within timeout
>>> limits. Is this plausible? If yes, what can we do with it?
>>
>>
>> We've seen these as well.  In our case, increasing the akka timeouts and
>> framesize helped a lot.
>>
>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>
>>>
>>>
>>> In general, these are scary errors in the sense that they come from the
>>> very core of the framework and it's hard to link it to something we do in
>>> our own code, and thus hard to find a fix. So a question more for the
>>> community: how often do you end up scratching your head about cases where
>>> spark
>>>
>>> magic doesn't work perfectly?
>>
>>
>> For us, this happens most often for jobs processing TBs of data (instead
>> of GBs)... which is frustrating of course because these jobs cost a lot more
>> in $$$ + time to run/debug/diagnose than smaller jobs.
>>
>> It means we have to comb the logs to understand what happened, interpret
>> stack traces, dump memory / object allocations, read Spark source to
>> formulate hypothesis about what went wrong and then trial + error to get to
>> a configuration that works.   Again, if Spark had better defaults and more
>> conservative execution model (rely less on in-memory caching of RDDs and
>> associated metadata, keepings large communication buffers on the heap,
>> etc.), it would definitely simplify our lives.
>>
>> (Though I recognize that others might use Spark very differently and that
>> these defaults and conservative behavior might not please everybody.)
>>
>> Hopefully this is the kind of feedback you were looking for...
>>
>>>
>>> 3. Recalculation of cached rdds
>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>> cache them and then do some jobs on them, at the very least a count on each.
>>> After these jobs are done I see on the storage panel that 100% of these RDDs
>>> are cached in memory.
>>>
>>> Then I create a third RDD C which is created by multiple joins and maps
>>> from A and B, also cache it and start a job on C. When I do this I still see
>>> A and B completely cached and also see C slowly getting more and more
>>> cached. This is all fine and good, but in the meanwhile I see stages running
>>> on the UI that point to code which is used to load A and B. How is this
>>> possible? Am I misunderstanding how cached RDDs should behave?
>>>
>>> And again the general question - how can one debug such issues?
>>>
>>> 4. Shuffle on disk
>>> Is it true - I couldn't find it in official docs, but did see this
>>> mentioned in various threads - that shuffle _always_ hits disk?
>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>> function to do shuffle in memory or are there some intrinsic reasons for
>>> this to be impossible?
>>>
>>>
>>> Sorry again for the giant mail, and thanks for any insights!
>>>
>>> Andras
>>>
>>>
>>
>

Re: Spark - ready for prime time?

Posted by Debasish Das <de...@gmail.com>.
I agree with Andrew....Every time I underestimate the RAM requirement....my
hand calculations are always ways less than what JVM actually allocates...

But I guess I will understand the Scala JVM optimizations as I get more
pain....


On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:

> The biggest issue I've come across is that the cluster is somewhat
> unstable when under memory pressure.  Meaning that if you attempt to
> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
> often still get OOMs.  I had to carefully modify some of the space tuning
> parameters and GC settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>wrote:
>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>> andras.nemeth@lynxanalytics.com> wrote:
>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>
>>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>>> before. So that's why I'm asking: is Spark being already used in
>>> professional settings? Can one already trust it being reasonably bug free
>>> and reliable?
>>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy
>> it performs better than Hadoop but we don't require/rely on its memory
>> caching features.  In fact, for most of our jobs it would simplify our
>> lives if Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to
>>> take RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might
>>> really want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>> the system started to cache RDD partitions on the driver as well. As the
>>> driver ran out of memory I started to see evictions while there were still
>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>> this be avoided somehow?
>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>> size of a broadcast programmatically?
>>>
>>>
>>> 2. Akka lost connections
>>> We have quite often experienced lost executors due to akka exceptions -
>>> mostly connection lost or similar. It seems to happen when an executor gets
>>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>>> network threads get starved and the executor fails to respond within
>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>
>>
>> We've seen these as well.  In our case, increasing the akka timeouts and
>> framesize helped a lot.
>>
>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>
>>
>>>
>>> In general, these are scary errors in the sense that they come from the
>>> very core of the framework and it's hard to link it to something we do in
>>> our own code, and thus hard to find a fix. So a question more for the
>>> community: how often do you end up scratching your head about cases where
>>> spark
>>>
>> magic doesn't work perfectly?
>>>
>>
>> For us, this happens most often for jobs processing TBs of data (instead
>> of GBs)... which is frustrating of course because these jobs cost a lot
>> more in $$$ + time to run/debug/diagnose than smaller jobs.
>>
>> It means we have to comb the logs to understand what happened, interpret
>> stack traces, dump memory / object allocations, read Spark source to
>> formulate hypothesis about what went wrong and then trial + error to get to
>> a configuration that works.   Again, if Spark had better defaults and more
>> conservative execution model (rely less on in-memory caching of RDDs and
>> associated metadata, keepings large communication buffers on the heap,
>> etc.), it would definitely simplify our lives.
>>
>> (Though I recognize that others might use Spark very differently and that
>> these defaults and conservative behavior might not please everybody.)
>>
>> Hopefully this is the kind of feedback you were looking for...
>>
>>
>>> 3. Recalculation of cached rdds
>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>> cache them and then do some jobs on them, at the very least a count on
>>> each. After these jobs are done I see on the storage panel that 100% of
>>> these RDDs are cached in memory.
>>>
>>> Then I create a third RDD C which is created by multiple joins and maps
>>> from A and B, also cache it and start a job on C. When I do this I still
>>> see A and B completely cached and also see C slowly getting more and more
>>> cached. This is all fine and good, but in the meanwhile I see stages
>>> running on the UI that point to code which is used to load A and B. How is
>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>
>>> And again the general question - how can one debug such issues?
>>>
>>> 4. Shuffle on disk
>>> Is it true - I couldn't find it in official docs, but did see this
>>> mentioned in various threads - that shuffle _always_ hits disk?
>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>> function to do shuffle in memory or are there some intrinsic reasons for
>>> this to be impossible?
>>>
>>>
>>> Sorry again for the giant mail, and thanks for any insights!
>>>
>>> Andras
>>>
>>>
>>>
>>
>

Re: Spark - ready for prime time?

Posted by Surendranauth Hiraman <su...@velos.io>.
Excellent, thanks you.



On Fri, Apr 11, 2014 at 12:09 PM, Matei Zaharia <ma...@gmail.com>wrote:

> It's not a new API, it just happens underneath the current one if you have
> spark.shuffle.spill set to true (which it is by default). Take a look at
> the config settings that mention "spill" in
> http://spark.incubator.apache.org/docs/latest/configuration.html.
>
> Matei
>
> On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman <su...@velos.io>
> wrote:
>
> Matei,
>
> Where is the functionality in 0.9 to spill data within a task (separately
> from persist)? My apologies if this is something obvious but I don't see it
> in the api docs.
>
> -Suren
>
>
>
> On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> To add onto the discussion about memory working space, 0.9 introduced the
>> ability to spill data within a task to disk, and in 1.0 we're also changing
>> the interface to allow spilling data within the same *group* to disk (e.g.
>> when you do groupBy and get a key with lots of values). The main reason
>> these weren't there was that for a lot of workloads (everything except the
>> same key having lots of values), simply launching more reduce tasks was
>> also a good solution, because it results in an external sort across the
>> cluster similar to what would happen within a task.
>>
>> Overall, expect to see more work to both explain how things execute (
>> http://spark.incubator.apache.org/docs/latest/tuning.html is one
>> example, the monitoring UI is another) and try to make things require no
>> configuration out of the box. We're doing a lot of this based on user
>> feedback, so that's definitely appreciated.
>>
>> Matei
>>
>> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:
>>
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
>>
>>> The biggest issue I've come across is that the cluster is somewhat
>>> unstable when under memory pressure.  Meaning that if you attempt to
>>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>>> often still get OOMs.  I had to carefully modify some of the space tuning
>>> parameters and GC settings to get some jobs to even finish.
>>>
>>> The other issue I've observed is if you group on a key that is highly
>>> skewed, with a few massively-common keys and a long tail of rare keys, the
>>> one massive key can be too big for a single machine and again cause OOMs.
>>>
>>
>> My take on it -- Spark doesn't believe in sort-and-spill things to enable
>> super long groups, and IMO for a good reason. Here are my thoughts:
>>
>> (1) in my work i don't need "sort" in 99% of the cases, i only need
>> "group" which absolutely doesn't need the spill which makes things slow
>> down to a crawl.
>> (2) if that's an aggregate (such as group count), use combine(), not
>> groupByKey -- this will do tons of good on memory use.
>> (3) if you really need groups that don't fit into memory, that is always
>> because you want to do something that is other than aggregation, with them.
>> E,g build an index of that grouped data. we actually had a case just like
>> that. In this case your friend is really not groupBy, but rather
>> PartitionBy. I.e. what happens there you build a quick count sketch,
>> perhaps on downsampled data, to figure which keys have sufficiently "big"
>> count -- and then you build a partitioner that redirects large groups to a
>> dedicated map(). assuming this map doesn't try to load things in memory but
>> rather do something like streaming BTree build, that should be fine. In
>> certain cituations such processing may require splitting super large group
>> even into smaller sub groups (e.g. partitioned BTree structure), at which
>> point you should be fine even from uniform load point of view. It takes a
>> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
>> not promise do this all for you in the groupBy contract.
>>
>>
>>
>>>
>>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>>> issues.
>>>
>>> Just my personal experience, but I've observed significant improvements
>>> in stability since even the 0.7.x days, so I'm confident that things will
>>> continue to get better as long as people report what they're seeing so it
>>> can get fixed.
>>>
>>> Andrew
>>>
>>>
>>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>wrote:
>>>
>>>> I'll provide answers from our own experience at Bizo.  We've been using
>>>> Spark for 1+ year now and have found it generally better than previous
>>>> approaches (Hadoop + Hive mostly).
>>>>
>>>>
>>>>
>>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>>> andras.nemeth@lynxanalytics.com> wrote:
>>>>
>>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>>> it's extremely convenient and efficient when it indeed works. But should we
>>>>> be worried that customization is hard if the built in behavior is not quite
>>>>> right for us? Are we to expect hard to track down issues originating from
>>>>> the black box behind the magic?
>>>>>
>>>>
>>>> I think is goes back to understanding Spark's architecture, its design
>>>> constraints and the problems it explicitly set out to address.   If the
>>>> solution to your problems can be easily formulated in terms of the
>>>> map/reduce model, then it's a good choice.  You'll want your
>>>> "customizations" to go with (not against) the grain of the architecture.
>>>>
>>>>
>>>>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>>>>> before. So that's why I'm asking: is Spark being already used in
>>>>> professional settings? Can one already trust it being reasonably bug free
>>>>> and reliable?
>>>>>
>>>>
>>>> There are lots of ways to use Spark; and not all of the features are
>>>> necessarily at the same level of maturity.   For instance, we put all the
>>>> jars on the main classpath so we've never run into the issue your pull
>>>> request addresses.
>>>>
>>>> We definitely use and rely on Spark on a professional basis.  We have
>>>> 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of
>>>> data.   Once we got them working with the proper configuration settings,
>>>> they have been running reliability since.
>>>>
>>>> I would characterize our use of Spark as a "better Hadoop", in the
>>>> sense that we use it for batch processing only, no streaming yet.   We're
>>>> happy it performs better than Hadoop but we don't require/rely on its
>>>> memory caching features.  In fact, for most of our jobs it would simplify
>>>> our lives if Spark wouldn't cache so many things in memory since it would
>>>> make configuration/tuning a lot simpler and jobs would run successfully on
>>>> the first try instead of having to tweak things (# of partitions and such).
>>>>
>>>> So, to the concrete issues. Sorry for the long mail, and let me know if
>>>>> I should break this out into more threads or if there is some other way to
>>>>> have this discussion...
>>>>>
>>>>> 1. Memory management
>>>>> The general direction of these questions is whether it's possible to
>>>>> take RDD caching related memory management more into our own hands as LRU
>>>>> eviction is nice most of the time but can be very suboptimal in some of our
>>>>> use cases.
>>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>>>> data essential.
>>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>>> cached rdd and memory sizes available in total/per executor? If we could do
>>>>> this we could indirectly avoid automatic evictions of things we might
>>>>> really want to keep in memory.
>>>>> C. Evictions caused by RDD partitions on the driver. I had a setup
>>>>> with huge worker memory and smallish memory on the driver JVM. To my
>>>>> surprise, the system started to cache RDD partitions on the driver as well.
>>>>> As the driver ran out of memory I started to see evictions while there were
>>>>> still plenty of space on workers. This resulted in lengthy recomputations.
>>>>> Can this be avoided somehow?
>>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>>> size of a broadcast programmatically?
>>>>>
>>>>>
>>>>> 2. Akka lost connections
>>>>> We have quite often experienced lost executors due to akka exceptions
>>>>> - mostly connection lost or similar. It seems to happen when an executor
>>>>> gets extremely busy with some CPU intensive work. Our hypothesis is that
>>>>> akka network threads get starved and the executor fails to respond within
>>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>>
>>>>
>>>> We've seen these as well.  In our case, increasing the akka timeouts
>>>> and framesize helped a lot.
>>>>
>>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>>
>>>>
>>>>>
>>>>> In general, these are scary errors in the sense that they come from
>>>>> the very core of the framework and it's hard to link it to something we do
>>>>> in our own code, and thus hard to find a fix. So a question more for the
>>>>> community: how often do you end up scratching your head about cases where
>>>>> spark
>>>>>
>>>> magic doesn't work perfectly?
>>>>>
>>>>
>>>> For us, this happens most often for jobs processing TBs of data
>>>> (instead of GBs)... which is frustrating of course because these jobs cost
>>>> a lot more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>>
>>>> It means we have to comb the logs to understand what happened,
>>>> interpret stack traces, dump memory / object allocations, read Spark source
>>>> to formulate hypothesis about what went wrong and then trial + error to get
>>>> to a configuration that works.   Again, if Spark had better defaults and
>>>> more conservative execution model (rely less on in-memory caching of RDDs
>>>> and associated metadata, keepings large communication buffers on the heap,
>>>> etc.), it would definitely simplify our lives.
>>>>
>>>> (Though I recognize that others might use Spark very differently and
>>>> that these defaults and conservative behavior might not please everybody.)
>>>>
>>>> Hopefully this is the kind of feedback you were looking for...
>>>>
>>>>
>>>>> 3. Recalculation of cached rdds
>>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>>> cache them and then do some jobs on them, at the very least a count on
>>>>> each. After these jobs are done I see on the storage panel that 100% of
>>>>> these RDDs are cached in memory.
>>>>>
>>>>> Then I create a third RDD C which is created by multiple joins and
>>>>> maps from A and B, also cache it and start a job on C. When I do this I
>>>>> still see A and B completely cached and also see C slowly getting more and
>>>>> more cached. This is all fine and good, but in the meanwhile I see stages
>>>>> running on the UI that point to code which is used to load A and B. How is
>>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>>
>>>>> And again the general question - how can one debug such issues?
>>>>>
>>>>> 4. Shuffle on disk
>>>>> Is it true - I couldn't find it in official docs, but did see this
>>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>>> this to be impossible?
>>>>>
>>>>>
>>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>>
>>>>> Andras
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v <su...@sociocast.com>elos.io
> W: www.velos.io
>
>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <su...@sociocast.com>elos.io
W: www.velos.io

Re: Spark - ready for prime time?

Posted by Matei Zaharia <ma...@gmail.com>.
It’s not a new API, it just happens underneath the current one if you have spark.shuffle.spill set to true (which it is by default). Take a look at the config settings that mention “spill” in http://spark.incubator.apache.org/docs/latest/configuration.html.

Matei

On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman <su...@velos.io> wrote:

> Matei,
> 
> Where is the functionality in 0.9 to spill data within a task (separately from persist)? My apologies if this is something obvious but I don't see it in the api docs.
> 
> -Suren
> 
> 
> 
> On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia <ma...@gmail.com> wrote:
> To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we’re also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren’t there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task.
> 
> Overall, expect to see more work to both explain how things execute (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We’re doing a lot of this based on user feedback, so that’s definitely appreciated.
> 
> Matei
> 
> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:
> 
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
>> The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure.  Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs.  I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish.
>> 
>> The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs.
>> 
>> My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts:
>> 
>> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" which absolutely doesn't need the spill which makes things slow down to a crawl. 
>> (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use.
>> (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently "big" count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract.
>> 
>>  
>> 
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>> 
>> Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed.
>> 
>> Andrew
>> 
>> 
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com> wrote:
>> I'll provide answers from our own experience at Bizo.  We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly).
>> 
>> 
>> 
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <an...@lynxanalytics.com> wrote:
>> I. Is it too much magic? Lots of things "just work right" in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic?
>> 
>> I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address.   If the solution to your problems can be easily formulated in terms of the map/reduce model, then it's a good choice.  You'll want your "customizations" to go with (not against) the grain of the architecture.
>>  
>> II. Is it mature enough? E.g. we've created a pull request which fixes a problem that we were very surprised no one ever stumbled upon before. So that's why I'm asking: is Spark being already used in professional settings? Can one already trust it being reasonably bug free and reliable?
>> 
>> There are lots of ways to use Spark; and not all of the features are necessarily at the same level of maturity.   For instance, we put all the jars on the main classpath so we've never run into the issue your pull request addresses.
>> 
>> We definitely use and rely on Spark on a professional basis.  We have 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of data.   Once we got them working with the proper configuration settings, they have been running reliability since.
>> 
>> I would characterize our use of Spark as a "better Hadoop", in the sense that we use it for batch processing only, no streaming yet.   We're happy it performs better than Hadoop but we don't require/rely on its memory caching features.  In fact, for most of our jobs it would simplify our lives if Spark wouldn't cache so many things in memory since it would make configuration/tuning a lot simpler and jobs would run successfully on the first try instead of having to tweak things (# of partitions and such).
>> 
>> So, to the concrete issues. Sorry for the long mail, and let me know if I should break this out into more threads or if there is some other way to have this discussion...
>> 
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take RDD caching related memory management more into our own hands as LRU eviction is nice most of the time but can be very suboptimal in some of our use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one really wants to keep. I'm fine with going down in flames if I mark too much data essential.
>> B. Memory "reflection": can you pragmatically get the memory size of a cached rdd and memory sizes available in total/per executor? If we could do this we could indirectly avoid automatic evictions of things we might really want to keep in memory.
>> C. Evictions caused by RDD partitions on the driver. I had a setup with huge worker memory and smallish memory on the driver JVM. To my surprise, the system started to cache RDD partitions on the driver as well. As the driver ran out of memory I started to see evictions while there were still plenty of space on workers. This resulted in lengthy recomputations. Can this be avoided somehow?
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without waiting for the LRU eviction taking care of it? Can you tell the size of a broadcast programmatically?
>> 
>> 
>> 2. Akka lost connections
>> We have quite often experienced lost executors due to akka exceptions - mostly connection lost or similar. It seems to happen when an executor gets extremely busy with some CPU intensive work. Our hypothesis is that akka network threads get starved and the executor fails to respond within timeout limits. Is this plausible? If yes, what can we do with it?
>> 
>> We've seen these as well.  In our case, increasing the akka timeouts and framesize helped a lot.
>> 
>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>  
>> 
>> In general, these are scary errors in the sense that they come from the very core of the framework and it's hard to link it to something we do in our own code, and thus hard to find a fix. So a question more for the community: how often do you end up scratching your head about cases where spark 
>> magic doesn't work perfectly?
>> 
>> For us, this happens most often for jobs processing TBs of data (instead of GBs)... which is frustrating of course because these jobs cost a lot more in $$$ + time to run/debug/diagnose than smaller jobs.
>> 
>> It means we have to comb the logs to understand what happened, interpret stack traces, dump memory / object allocations, read Spark source to formulate hypothesis about what went wrong and then trial + error to get to a configuration that works.   Again, if Spark had better defaults and more conservative execution model (rely less on in-memory caching of RDDs and associated metadata, keepings large communication buffers on the heap, etc.), it would definitely simplify our lives.  
>> 
>> (Though I recognize that others might use Spark very differently and that these defaults and conservative behavior might not please everybody.)
>> 
>> Hopefully this is the kind of feedback you were looking for...
>> 
>> 
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk, cache them and then do some jobs on them, at the very least a count on each. After these jobs are done I see on the storage panel that 100% of these RDDs are cached in memory.
>> 
>> Then I create a third RDD C which is created by multiple joins and maps from A and B, also cache it and start a job on C. When I do this I still see A and B completely cached and also see C slowly getting more and more cached. This is all fine and good, but in the meanwhile I see stages running on the UI that point to code which is used to load A and B. How is this possible? Am I misunderstanding how cached RDDs should behave?
>> 
>> And again the general question - how can one debug such issues?
>> 
>> 4. Shuffle on disk
>> Is it true - I couldn't find it in official docs, but did see this mentioned in various threads - that shuffle _always_ hits disk? (Disregarding OS caches.) Why is this the case? Are you planning to add a function to do shuffle in memory or are there some intrinsic reasons for this to be impossible?
>> 
>> 
>> Sorry again for the giant mail, and thanks for any insights!
>> 
>> Andras
>> 
>> 
>> 
>> 
>> 
> 
> 
> 
> 
> -- 
>                                                             
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
> 
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@velos.io
> W: www.velos.io
> 


Re: Spark - ready for prime time?

Posted by Surendranauth Hiraman <su...@velos.io>.
Matei,

Where is the functionality in 0.9 to spill data within a task (separately
from persist)? My apologies if this is something obvious but I don't see it
in the api docs.

-Suren



On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia <ma...@gmail.com>wrote:

> To add onto the discussion about memory working space, 0.9 introduced the
> ability to spill data within a task to disk, and in 1.0 we're also changing
> the interface to allow spilling data within the same *group* to disk (e.g.
> when you do groupBy and get a key with lots of values). The main reason
> these weren't there was that for a lot of workloads (everything except the
> same key having lots of values), simply launching more reduce tasks was
> also a good solution, because it results in an external sort across the
> cluster similar to what would happen within a task.
>
> Overall, expect to see more work to both explain how things execute (
> http://spark.incubator.apache.org/docs/latest/tuning.html is one example,
> the monitoring UI is another) and try to make things require no
> configuration out of the box. We're doing a lot of this based on user
> feedback, so that's definitely appreciated.
>
> Matei
>
> On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:
>
> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
>
>> The biggest issue I've come across is that the cluster is somewhat
>> unstable when under memory pressure.  Meaning that if you attempt to
>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>> often still get OOMs.  I had to carefully modify some of the space tuning
>> parameters and GC settings to get some jobs to even finish.
>>
>> The other issue I've observed is if you group on a key that is highly
>> skewed, with a few massively-common keys and a long tail of rare keys, the
>> one massive key can be too big for a single machine and again cause OOMs.
>>
>
> My take on it -- Spark doesn't believe in sort-and-spill things to enable
> super long groups, and IMO for a good reason. Here are my thoughts:
>
> (1) in my work i don't need "sort" in 99% of the cases, i only need
> "group" which absolutely doesn't need the spill which makes things slow
> down to a crawl.
> (2) if that's an aggregate (such as group count), use combine(), not
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always
> because you want to do something that is other than aggregation, with them.
> E,g build an index of that grouped data. we actually had a case just like
> that. In this case your friend is really not groupBy, but rather
> PartitionBy. I.e. what happens there you build a quick count sketch,
> perhaps on downsampled data, to figure which keys have sufficiently "big"
> count -- and then you build a partitioner that redirects large groups to a
> dedicated map(). assuming this map doesn't try to load things in memory but
> rather do something like streaming BTree build, that should be fine. In
> certain cituations such processing may require splitting super large group
> even into smaller sub groups (e.g. partitioned BTree structure), at which
> point you should be fine even from uniform load point of view. It takes a
> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
> not promise do this all for you in the groupBy contract.
>
>
>
>>
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>> issues.
>>
>> Just my personal experience, but I've observed significant improvements
>> in stability since even the 0.7.x days, so I'm confident that things will
>> continue to get better as long as people report what they're seeing so it
>> can get fixed.
>>
>> Andrew
>>
>>
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>wrote:
>>
>>> I'll provide answers from our own experience at Bizo.  We've been using
>>> Spark for 1+ year now and have found it generally better than previous
>>> approaches (Hadoop + Hive mostly).
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>> andras.nemeth@lynxanalytics.com> wrote:
>>>
>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>> it's extremely convenient and efficient when it indeed works. But should we
>>>> be worried that customization is hard if the built in behavior is not quite
>>>> right for us? Are we to expect hard to track down issues originating from
>>>> the black box behind the magic?
>>>>
>>>
>>> I think is goes back to understanding Spark's architecture, its design
>>> constraints and the problems it explicitly set out to address.   If the
>>> solution to your problems can be easily formulated in terms of the
>>> map/reduce model, then it's a good choice.  You'll want your
>>> "customizations" to go with (not against) the grain of the architecture.
>>>
>>>
>>>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>>>> before. So that's why I'm asking: is Spark being already used in
>>>> professional settings? Can one already trust it being reasonably bug free
>>>> and reliable?
>>>>
>>>
>>> There are lots of ways to use Spark; and not all of the features are
>>> necessarily at the same level of maturity.   For instance, we put all the
>>> jars on the main classpath so we've never run into the issue your pull
>>> request addresses.
>>>
>>> We definitely use and rely on Spark on a professional basis.  We have 5+
>>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>>> Once we got them working with the proper configuration settings, they have
>>> been running reliability since.
>>>
>>> I would characterize our use of Spark as a "better Hadoop", in the sense
>>> that we use it for batch processing only, no streaming yet.   We're happy
>>> it performs better than Hadoop but we don't require/rely on its memory
>>> caching features.  In fact, for most of our jobs it would simplify our
>>> lives if Spark wouldn't cache so many things in memory since it would make
>>> configuration/tuning a lot simpler and jobs would run successfully on the
>>> first try instead of having to tweak things (# of partitions and such).
>>>
>>> So, to the concrete issues. Sorry for the long mail, and let me know if
>>>> I should break this out into more threads or if there is some other way to
>>>> have this discussion...
>>>>
>>>> 1. Memory management
>>>> The general direction of these questions is whether it's possible to
>>>> take RDD caching related memory management more into our own hands as LRU
>>>> eviction is nice most of the time but can be very suboptimal in some of our
>>>> use cases.
>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>>> data essential.
>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>> cached rdd and memory sizes available in total/per executor? If we could do
>>>> this we could indirectly avoid automatic evictions of things we might
>>>> really want to keep in memory.
>>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>>> the system started to cache RDD partitions on the driver as well. As the
>>>> driver ran out of memory I started to see evictions while there were still
>>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>>> this be avoided somehow?
>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>> size of a broadcast programmatically?
>>>>
>>>>
>>>> 2. Akka lost connections
>>>> We have quite often experienced lost executors due to akka exceptions -
>>>> mostly connection lost or similar. It seems to happen when an executor gets
>>>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>>>> network threads get starved and the executor fails to respond within
>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>
>>>
>>> We've seen these as well.  In our case, increasing the akka timeouts and
>>> framesize helped a lot.
>>>
>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>
>>>
>>>>
>>>> In general, these are scary errors in the sense that they come from the
>>>> very core of the framework and it's hard to link it to something we do in
>>>> our own code, and thus hard to find a fix. So a question more for the
>>>> community: how often do you end up scratching your head about cases where
>>>> spark
>>>>
>>> magic doesn't work perfectly?
>>>>
>>>
>>> For us, this happens most often for jobs processing TBs of data (instead
>>> of GBs)... which is frustrating of course because these jobs cost a lot
>>> more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>
>>> It means we have to comb the logs to understand what happened, interpret
>>> stack traces, dump memory / object allocations, read Spark source to
>>> formulate hypothesis about what went wrong and then trial + error to get to
>>> a configuration that works.   Again, if Spark had better defaults and more
>>> conservative execution model (rely less on in-memory caching of RDDs and
>>> associated metadata, keepings large communication buffers on the heap,
>>> etc.), it would definitely simplify our lives.
>>>
>>> (Though I recognize that others might use Spark very differently and
>>> that these defaults and conservative behavior might not please everybody.)
>>>
>>> Hopefully this is the kind of feedback you were looking for...
>>>
>>>
>>>> 3. Recalculation of cached rdds
>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>> cache them and then do some jobs on them, at the very least a count on
>>>> each. After these jobs are done I see on the storage panel that 100% of
>>>> these RDDs are cached in memory.
>>>>
>>>> Then I create a third RDD C which is created by multiple joins and maps
>>>> from A and B, also cache it and start a job on C. When I do this I still
>>>> see A and B completely cached and also see C slowly getting more and more
>>>> cached. This is all fine and good, but in the meanwhile I see stages
>>>> running on the UI that point to code which is used to load A and B. How is
>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>
>>>> And again the general question - how can one debug such issues?
>>>>
>>>> 4. Shuffle on disk
>>>> Is it true - I couldn't find it in official docs, but did see this
>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>> this to be impossible?
>>>>
>>>>
>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>
>>>> Andras
>>>>
>>>>
>>>>
>>>
>>
>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <su...@sociocast.com>elos.io
W: www.velos.io

Re: Spark - ready for prime time?

Posted by Matei Zaharia <ma...@gmail.com>.
To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we’re also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren’t there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task.

Overall, expect to see more work to both explain how things execute (http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We’re doing a lot of this based on user feedback, so that’s definitely appreciated.

Matei

On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com> wrote:

> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure.  Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs.  I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish.
> 
> The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs.
> 
> My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts:
> 
> (1) in my work i don't need "sort" in 99% of the cases, i only need "group" which absolutely doesn't need the spill which makes things slow down to a crawl. 
> (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently "big" count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract.
> 
>  
> 
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
> 
> Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed.
> 
> Andrew
> 
> 
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com> wrote:
> I'll provide answers from our own experience at Bizo.  We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly).
> 
> 
> 
> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <an...@lynxanalytics.com> wrote:
> I. Is it too much magic? Lots of things "just work right" in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic?
> 
> I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address.   If the solution to your problems can be easily formulated in terms of the map/reduce model, then it's a good choice.  You'll want your "customizations" to go with (not against) the grain of the architecture.
>  
> II. Is it mature enough? E.g. we've created a pull request which fixes a problem that we were very surprised no one ever stumbled upon before. So that's why I'm asking: is Spark being already used in professional settings? Can one already trust it being reasonably bug free and reliable?
> 
> There are lots of ways to use Spark; and not all of the features are necessarily at the same level of maturity.   For instance, we put all the jars on the main classpath so we've never run into the issue your pull request addresses.
> 
> We definitely use and rely on Spark on a professional basis.  We have 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of data.   Once we got them working with the proper configuration settings, they have been running reliability since.
> 
> I would characterize our use of Spark as a "better Hadoop", in the sense that we use it for batch processing only, no streaming yet.   We're happy it performs better than Hadoop but we don't require/rely on its memory caching features.  In fact, for most of our jobs it would simplify our lives if Spark wouldn't cache so many things in memory since it would make configuration/tuning a lot simpler and jobs would run successfully on the first try instead of having to tweak things (# of partitions and such).
> 
> So, to the concrete issues. Sorry for the long mail, and let me know if I should break this out into more threads or if there is some other way to have this discussion...
> 
> 1. Memory management
> The general direction of these questions is whether it's possible to take RDD caching related memory management more into our own hands as LRU eviction is nice most of the time but can be very suboptimal in some of our use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one really wants to keep. I'm fine with going down in flames if I mark too much data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a cached rdd and memory sizes available in total/per executor? If we could do this we could indirectly avoid automatic evictions of things we might really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with huge worker memory and smallish memory on the driver JVM. To my surprise, the system started to cache RDD partitions on the driver as well. As the driver ran out of memory I started to see evictions while there were still plenty of space on workers. This resulted in lengthy recomputations. Can this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without waiting for the LRU eviction taking care of it? Can you tell the size of a broadcast programmatically?
> 
> 
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions - mostly connection lost or similar. It seems to happen when an executor gets extremely busy with some CPU intensive work. Our hypothesis is that akka network threads get starved and the executor fails to respond within timeout limits. Is this plausible? If yes, what can we do with it?
> 
> We've seen these as well.  In our case, increasing the akka timeouts and framesize helped a lot.
> 
> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>  
> 
> In general, these are scary errors in the sense that they come from the very core of the framework and it's hard to link it to something we do in our own code, and thus hard to find a fix. So a question more for the community: how often do you end up scratching your head about cases where spark 
> magic doesn't work perfectly?
> 
> For us, this happens most often for jobs processing TBs of data (instead of GBs)... which is frustrating of course because these jobs cost a lot more in $$$ + time to run/debug/diagnose than smaller jobs.
> 
> It means we have to comb the logs to understand what happened, interpret stack traces, dump memory / object allocations, read Spark source to formulate hypothesis about what went wrong and then trial + error to get to a configuration that works.   Again, if Spark had better defaults and more conservative execution model (rely less on in-memory caching of RDDs and associated metadata, keepings large communication buffers on the heap, etc.), it would definitely simplify our lives.  
> 
> (Though I recognize that others might use Spark very differently and that these defaults and conservative behavior might not please everybody.)
> 
> Hopefully this is the kind of feedback you were looking for...
> 
> 
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk, cache them and then do some jobs on them, at the very least a count on each. After these jobs are done I see on the storage panel that 100% of these RDDs are cached in memory.
> 
> Then I create a third RDD C which is created by multiple joins and maps from A and B, also cache it and start a job on C. When I do this I still see A and B completely cached and also see C slowly getting more and more cached. This is all fine and good, but in the meanwhile I see stages running on the UI that point to code which is used to load A and B. How is this possible? Am I misunderstanding how cached RDDs should behave?
> 
> And again the general question - how can one debug such issues?
> 
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned in various threads - that shuffle _always_ hits disk? (Disregarding OS caches.) Why is this the case? Are you planning to add a function to do shuffle in memory or are there some intrinsic reasons for this to be impossible?
> 
> 
> Sorry again for the giant mail, and thanks for any insights!
> 
> Andras
> 
> 
> 
> 
> 


Re: Spark - ready for prime time?

Posted by Brad Miller <bm...@eecs.berkeley.edu>.
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned
> in various threads - that shuffle _always_ hits disk? (Disregarding OS
> caches.) Why is this the case? Are you planning to add a function to do
> shuffle in memory or are there some intrinsic reasons for this to be
> impossible?
>
> I don't think it's true... as far as I'm concerned Spark doesn't peek into
> the OS and force it to disregard buffer caches. In general, for large
> shuffles, all shuffle files do not fit into memory, so we kind of have to
> write them out to disk. There is an undocumented option to sync writing
> shuffle files to disk every time we write a block, but that is by default
> false and not many people use it (for obvious reasons).

I believe I recently had the experience that for the map portion of
the shuffle all shuffle files seemed to be written into the file
system (albeit potentially on buffer caches).  The size of the shuffle
files on hosts matched the size of the "shuffle write" metric shown in
the UI (pyspark branch-0.9 as of Monday), so there didn't seem to be
any effort to keep the shuffle files in memory.

On Thu, Apr 10, 2014 at 12:43 PM, Andrew Or <an...@databricks.com> wrote:
> Here are answers to a subset of your questions:
>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data
>
> As far as I am aware, there is currently no other eviction policies for RDD
> blocks other than LRU. Your suggestion of prioritizing RDDs is an
> interesting one and I'm sure other users would like that as well.
>
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might really
>> want to keep in memory.
>
> All this information should be displayed on the UI under the Storage tab.
>
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still >
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>
> The amount of space used for RDD storage is only a fraction of the total
> amount of memory available to the JVM. More specifically, it is governed by
> `spark.storage.memoryFraction`, which is by default 60%. This may explain
> why evictions seem to occur pre-maturely sometimes. In the future, we should
> probably add a table that contains information about evicted RDDs on the UI,
> so it's easier to track them. Right now evicted RDD's disappear from the
> face of the planet completely, sometimes leaving the user somewhat
> confounded. Though with off-heap storage (Tachyon) this may become less
> relevant.
>
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>
> In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
> explicitly added! Under the storage tab of the UI, we could probably also
> have a Broadcast table in the future, seeing that there are users interested
> in this feature.
>
>
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk,
>> cache them and then do some jobs on them, at the very least a count on each.
>> After these jobs are done I see on the storage panel that 100% of these RDDs
>> are cached in memory.
>> Then I create a third RDD C which is created by multiple joins and maps
>> from A and B, also cache it and start a job on C. When I do this I still see
>> A and B completely cached and also see C slowly getting more and more
>> cached. This is all fine and good, but in the meanwhile I see stages running
>> on the UI that point to code which is used to load A and B. How is this
>> possible? Am I misunderstanding how cached RDDs should behave?
>> And again the general question - how can one debug such issues?
>
> From the fractions of RDDs cached in memory, it seems to me that your
> application is running as expected. If you also cache C, then it will slowly
> add more blocks to storage, possibly evicting A and B if there is memory
> pressure. It's entirely possible that there is a bug on finding the call
> site on the stages page (there were a few PRs that made changes to this
> recently).
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this mentioned
> in various threads - that shuffle _always_ hits disk? (Disregarding OS
> caches.) Why is this the case? Are you planning to add a function to do
> shuffle in memory or are there some intrinsic reasons for this to be
> impossible?
>
> I don't think it's true... as far as I'm concerned Spark doesn't peek into
> the OS and force it to disregard buffer caches. In general, for large
> shuffles, all shuffle files do not fit into memory, so we kind of have to
> write them out to disk. There is an undocumented option to sync writing
> shuffle files to disk every time we write a block, but that is by default
> false and not many people use it (for obvious reasons).
>
>
>
> On Thu, Apr 10, 2014 at 12:05 PM, Roger Hoover <ro...@gmail.com>
> wrote:
>>
>> Can anyone comment on their experience running Spark Streaming in
>> production?
>>
>>
>> On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com>
>> wrote:
>>>
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
>>>>
>>>> The biggest issue I've come across is that the cluster is somewhat
>>>> unstable when under memory pressure.  Meaning that if you attempt to persist
>>>> an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often
>>>> still get OOMs.  I had to carefully modify some of the space tuning
>>>> parameters and GC settings to get some jobs to even finish.
>>>>
>>>> The other issue I've observed is if you group on a key that is highly
>>>> skewed, with a few massively-common keys and a long tail of rare keys, the
>>>> one massive key can be too big for a single machine and again cause OOMs.
>>>
>>>
>>> My take on it -- Spark doesn't believe in sort-and-spill things to enable
>>> super long groups, and IMO for a good reason. Here are my thoughts:
>>>
>>> (1) in my work i don't need "sort" in 99% of the cases, i only need
>>> "group" which absolutely doesn't need the spill which makes things slow down
>>> to a crawl.
>>> (2) if that's an aggregate (such as group count), use combine(), not
>>> groupByKey -- this will do tons of good on memory use.
>>> (3) if you really need groups that don't fit into memory, that is always
>>> because you want to do something that is other than aggregation, with them.
>>> E,g build an index of that grouped data. we actually had a case just like
>>> that. In this case your friend is really not groupBy, but rather
>>> PartitionBy. I.e. what happens there you build a quick count sketch, perhaps
>>> on downsampled data, to figure which keys have sufficiently "big" count --
>>> and then you build a partitioner that redirects large groups to a dedicated
>>> map(). assuming this map doesn't try to load things in memory but rather do
>>> something like streaming BTree build, that should be fine. In certain
>>> cituations such processing may require splitting super large group even into
>>> smaller sub groups (e.g. partitioned BTree structure), at which point you
>>> should be fine even from uniform load point of view. It takes a little of
>>> jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise
>>> do this all for you in the groupBy contract.
>>>
>>>
>>>>
>>>>
>>>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>>>> issues.
>>>>
>>>> Just my personal experience, but I've observed significant improvements
>>>> in stability since even the 0.7.x days, so I'm confident that things will
>>>> continue to get better as long as people report what they're seeing so it
>>>> can get fixed.
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>
>>>> wrote:
>>>>>
>>>>> I'll provide answers from our own experience at Bizo.  We've been using
>>>>> Spark for 1+ year now and have found it generally better than previous
>>>>> approaches (Hadoop + Hive mostly).
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth
>>>>> <an...@lynxanalytics.com> wrote:
>>>>>>
>>>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>>>> it's extremely convenient and efficient when it indeed works. But should we
>>>>>> be worried that customization is hard if the built in behavior is not quite
>>>>>> right for us? Are we to expect hard to track down issues originating from
>>>>>> the black box behind the magic?
>>>>>
>>>>>
>>>>> I think is goes back to understanding Spark's architecture, its design
>>>>> constraints and the problems it explicitly set out to address.   If the
>>>>> solution to your problems can be easily formulated in terms of the
>>>>> map/reduce model, then it's a good choice.  You'll want your
>>>>> "customizations" to go with (not against) the grain of the architecture.
>>>>>
>>>>>>
>>>>>> II. Is it mature enough? E.g. we've created a pull request which fixes
>>>>>> a problem that we were very surprised no one ever stumbled upon before. So
>>>>>> that's why I'm asking: is Spark being already used in professional settings?
>>>>>> Can one already trust it being reasonably bug free and reliable?
>>>>>
>>>>>
>>>>> There are lots of ways to use Spark; and not all of the features are
>>>>> necessarily at the same level of maturity.   For instance, we put all the
>>>>> jars on the main classpath so we've never run into the issue your pull
>>>>> request addresses.
>>>>>
>>>>> We definitely use and rely on Spark on a professional basis.  We have
>>>>> 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>>>>> Once we got them working with the proper configuration settings, they have
>>>>> been running reliability since.
>>>>>
>>>>> I would characterize our use of Spark as a "better Hadoop", in the
>>>>> sense that we use it for batch processing only, no streaming yet.   We're
>>>>> happy it performs better than Hadoop but we don't require/rely on its memory
>>>>> caching features.  In fact, for most of our jobs it would simplify our lives
>>>>> if Spark wouldn't cache so many things in memory since it would make
>>>>> configuration/tuning a lot simpler and jobs would run successfully on the
>>>>> first try instead of having to tweak things (# of partitions and such).
>>>>>
>>>>>> So, to the concrete issues. Sorry for the long mail, and let me know
>>>>>> if I should break this out into more threads or if there is some other way
>>>>>> to have this discussion...
>>>>>>
>>>>>> 1. Memory management
>>>>>> The general direction of these questions is whether it's possible to
>>>>>> take RDD caching related memory management more into our own hands as LRU
>>>>>> eviction is nice most of the time but can be very suboptimal in some of our
>>>>>> use cases.
>>>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>>>>> data essential.
>>>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>>>> cached rdd and memory sizes available in total/per executor? If we could do
>>>>>> this we could indirectly avoid automatic evictions of things we might really
>>>>>> want to keep in memory.
>>>>>> C. Evictions caused by RDD partitions on the driver. I had a setup
>>>>>> with huge worker memory and smallish memory on the driver JVM. To my
>>>>>> surprise, the system started to cache RDD partitions on the driver as well.
>>>>>> As the driver ran out of memory I started to see evictions while there were
>>>>>> still plenty of space on workers. This resulted in lengthy recomputations.
>>>>>> Can this be avoided somehow?
>>>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>>>> size of a broadcast programmatically?
>>>>>>
>>>>>>
>>>>>> 2. Akka lost connections
>>>>>> We have quite often experienced lost executors due to akka exceptions
>>>>>> - mostly connection lost or similar. It seems to happen when an executor
>>>>>> gets extremely busy with some CPU intensive work. Our hypothesis is that
>>>>>> akka network threads get starved and the executor fails to respond within
>>>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>>
>>>>>
>>>>> We've seen these as well.  In our case, increasing the akka timeouts
>>>>> and framesize helped a lot.
>>>>>
>>>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>>>
>>>>>>
>>>>>>
>>>>>> In general, these are scary errors in the sense that they come from
>>>>>> the very core of the framework and it's hard to link it to something we do
>>>>>> in our own code, and thus hard to find a fix. So a question more for the
>>>>>> community: how often do you end up scratching your head about cases where
>>>>>> spark
>>>>>>
>>>>>> magic doesn't work perfectly?
>>>>>
>>>>>
>>>>> For us, this happens most often for jobs processing TBs of data
>>>>> (instead of GBs)... which is frustrating of course because these jobs cost a
>>>>> lot more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>>>
>>>>> It means we have to comb the logs to understand what happened,
>>>>> interpret stack traces, dump memory / object allocations, read Spark source
>>>>> to formulate hypothesis about what went wrong and then trial + error to get
>>>>> to a configuration that works.   Again, if Spark had better defaults and
>>>>> more conservative execution model (rely less on in-memory caching of RDDs
>>>>> and associated metadata, keepings large communication buffers on the heap,
>>>>> etc.), it would definitely simplify our lives.
>>>>>
>>>>> (Though I recognize that others might use Spark very differently and
>>>>> that these defaults and conservative behavior might not please everybody.)
>>>>>
>>>>> Hopefully this is the kind of feedback you were looking for...
>>>>>
>>>>>>
>>>>>> 3. Recalculation of cached rdds
>>>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>>>> cache them and then do some jobs on them, at the very least a count on each.
>>>>>> After these jobs are done I see on the storage panel that 100% of these RDDs
>>>>>> are cached in memory.
>>>>>>
>>>>>> Then I create a third RDD C which is created by multiple joins and
>>>>>> maps from A and B, also cache it and start a job on C. When I do this I
>>>>>> still see A and B completely cached and also see C slowly getting more and
>>>>>> more cached. This is all fine and good, but in the meanwhile I see stages
>>>>>> running on the UI that point to code which is used to load A and B. How is
>>>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>>>
>>>>>> And again the general question - how can one debug such issues?
>>>>>>
>>>>>> 4. Shuffle on disk
>>>>>> Is it true - I couldn't find it in official docs, but did see this
>>>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>>>> this to be impossible?
>>>>>>
>>>>>>
>>>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>>>
>>>>>> Andras
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark - ready for prime time?

Posted by Andras Nemeth <an...@lynxanalytics.com>.
Thanks everybody for the answers, this was very helpful! Even just the
amount of answers is pretty reassuring. ;)

On what other technologies we consider. There is probably no other single
framework that promises to solve all our problems, but we could go with
some combination of scalding/cascading for things well suited for the MR
style of computation, maybe some BSP framework for iterative stuff and
something like impala for serving data.

Some more questions/clarifications inline, mostly on the more technical
side of things.

On Thu, Apr 10, 2014 at 9:43 PM, Andrew Or <an...@databricks.com> wrote:

> Here are answers to a subset of your questions:
>
> > 1. Memory management
> > The general direction of these questions is whether it's possible to
> take RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> > A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data
>
> As far as I am aware, there is currently no other eviction policies for
> RDD blocks other than LRU. Your suggestion of prioritizing RDDs is an
> interesting one and I'm sure other users would like that as well.
>
> > B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
>
> All this information should be displayed on the UI under the Storage tab.
>
Sorry, I mistyped my original question, I wanted to say programmatically
and not pragmatically. :) So I know it is available on the UI but I'd like
to access this information from my driver program. Is it possible?


>
>
> > C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> > plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
>
> The amount of space used for RDD storage is only a fraction of the total
> amount of memory available to the JVM. More specifically, it is governed by
> `spark.storage.memoryFraction`, which is by default 60%. This may explain
> why evictions seem to occur pre-maturely sometimes.
>
Yes, I'm aware of these. I'm looking at the executors tab, which I believe
already shows the 60% available for RDDs. Also, I only see evictions on the
driver. So I guess my point is could I force the system to never store
anything on the driver? (See attached screenshot which also shows that the
whole cluster is fine, but the driver is getting OOM.) Also, I see that
actual tasks never run on the driver (which seems the right thing to
happen), why do we ever cache RDDs there then? We can never use them
without having to copy them over somewhere else anyways.


> In the future, we should probably add a table that contains information
> about evicted RDDs on the UI, so it's easier to track them.
>
Yep, that would be great!


> Right now evicted RDD's disappear from the face of the planet completely,
> sometimes leaving the user somewhat confounded. Though with off-heap
> storage (Tachyon) this may become less relevant.
>
> > D. Broadcasts. Is it possible to get rid of a broadcast manually,
> without waiting for the LRU eviction taking care of it? Can you tell the
> size of a broadcast programmatically?
>
> In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
> explicitly added! Under the storage tab of the UI, we could probably also
> have a Broadcast table in the future, seeing that there are users
> interested in this feature.
>
Also sounds great!


>
>
> > 3. Recalculation of cached rdds
> > I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> these RDDs are cached in memory.
> > Then I create a third RDD C which is created by multiple joins and maps
> from A and B, also cache it and start a job on C. When I do this I still
> see A and B completely cached and also see C slowly getting more and more
> cached. This is all fine and good, but in the meanwhile I see stages
> running on the UI that point to code which is used to load A and B. How is
> this possible? Am I misunderstanding how cached RDDs should behave?
> > And again the general question - how can one debug such issues?
>
> From the fractions of RDDs cached in memory, it seems to me that your
> application is running as expected. If you also cache C, then it will
> slowly add more blocks to storage, possibly evicting A and B if there is
> memory pressure. It's entirely possible that there is a bug on finding the
> call site on the stages page (there were a few PRs that made changes to
> this recently).
>
I think you are right and maybe it's not even really a bug, although
somewhat confusing UI. Seems if the fully cached RDD X was created
originally on line N then when one reuses this cached RDD in a non-narrow
dependency then line N will be reported to be running again, but the actual
tasks will be very fast - probably just realizing that the data is already
there, maybe sending it somewhere, not sure.


> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this
> mentioned in various threads - that shuffle _always_ hits disk?
> (Disregarding OS caches.) Why is this the case? Are you planning to add a
> function to do shuffle in memory or are there some intrinsic reasons for
> this to be impossible?
>
> I don't think it's true... as far as I'm concerned Spark doesn't peek into
> the OS and force it to disregard buffer caches.
>
Sorry I guess the disregarding OS caches part wasn't really clear. What I
meant is the following:
- Spark always writes shuffle data into files on disks
- This may not result in actual disk IO, as sync is never called, so if the
outputted data fits in OS caches, then everything stays in memory.
Is this a correct summary of what's happening in spark?


> In general, for large shuffles, all shuffle files do not fit into memory,
> so we kind of have to write them out to disk.
>
Not sure I get this... Of course, it can happen that the shuffle data
doesn't fit in disk but then likely the resulting RDD also won't fit in
memory - at least not all of it at the same time. So couldn't you use the
same argument to always write RDDs to disk, saying that large RDDs do not
fit in memory so one kind of have to write them to disk? Sorry if I'm
asking sg stupid...


Cheers,
Andras


> There is an undocumented option to sync writing shuffle files to disk
> every time we write a block, but that is by default false and not many
> people use it (for obvious reasons).
>
>
>
> On Thu, Apr 10, 2014 at 12:05 PM, Roger Hoover <ro...@gmail.com>wrote:
>
>> Can anyone comment on their experience running Spark Streaming in
>> production?
>>
>>
>> On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com>wrote:
>>
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com>wrote:
>>>
>>>> The biggest issue I've come across is that the cluster is somewhat
>>>> unstable when under memory pressure.  Meaning that if you attempt to
>>>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>>>> often still get OOMs.  I had to carefully modify some of the space tuning
>>>> parameters and GC settings to get some jobs to even finish.
>>>>
>>>> The other issue I've observed is if you group on a key that is highly
>>>> skewed, with a few massively-common keys and a long tail of rare keys, the
>>>> one massive key can be too big for a single machine and again cause OOMs.
>>>>
>>>
>>> My take on it -- Spark doesn't believe in sort-and-spill things to
>>> enable super long groups, and IMO for a good reason. Here are my thoughts:
>>>
>>> (1) in my work i don't need "sort" in 99% of the cases, i only need
>>> "group" which absolutely doesn't need the spill which makes things slow
>>> down to a crawl.
>>> (2) if that's an aggregate (such as group count), use combine(), not
>>> groupByKey -- this will do tons of good on memory use.
>>> (3) if you really need groups that don't fit into memory, that is always
>>> because you want to do something that is other than aggregation, with them.
>>> E,g build an index of that grouped data. we actually had a case just like
>>> that. In this case your friend is really not groupBy, but rather
>>> PartitionBy. I.e. what happens there you build a quick count sketch,
>>> perhaps on downsampled data, to figure which keys have sufficiently "big"
>>> count -- and then you build a partitioner that redirects large groups to a
>>> dedicated map(). assuming this map doesn't try to load things in memory but
>>> rather do something like streaming BTree build, that should be fine. In
>>> certain cituations such processing may require splitting super large group
>>> even into smaller sub groups (e.g. partitioned BTree structure), at which
>>> point you should be fine even from uniform load point of view. It takes a
>>> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
>>> not promise do this all for you in the groupBy contract.
>>>
>>>
>>>
>>>>
>>>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>>>> issues.
>>>>
>>>> Just my personal experience, but I've observed significant improvements
>>>> in stability since even the 0.7.x days, so I'm confident that things will
>>>> continue to get better as long as people report what they're seeing so it
>>>> can get fixed.
>>>>
>>>> Andrew
>>>>
>>>>
>>>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <alex.boisvert@gmail.com
>>>> > wrote:
>>>>
>>>>> I'll provide answers from our own experience at Bizo.  We've been
>>>>> using Spark for 1+ year now and have found it generally better than
>>>>> previous approaches (Hadoop + Hive mostly).
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>>>> andras.nemeth@lynxanalytics.com> wrote:
>>>>>
>>>>>> I. Is it too much magic? Lots of things "just work right" in Spark
>>>>>> and it's extremely convenient and efficient when it indeed works. But
>>>>>> should we be worried that customization is hard if the built in behavior is
>>>>>> not quite right for us? Are we to expect hard to track down issues
>>>>>> originating from the black box behind the magic?
>>>>>>
>>>>>
>>>>> I think is goes back to understanding Spark's architecture, its design
>>>>> constraints and the problems it explicitly set out to address.   If the
>>>>> solution to your problems can be easily formulated in terms of the
>>>>> map/reduce model, then it's a good choice.  You'll want your
>>>>> "customizations" to go with (not against) the grain of the architecture.
>>>>>
>>>>>
>>>>>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>>>>>> before. So that's why I'm asking: is Spark being already used in
>>>>>> professional settings? Can one already trust it being reasonably bug free
>>>>>> and reliable?
>>>>>>
>>>>>
>>>>> There are lots of ways to use Spark; and not all of the features are
>>>>> necessarily at the same level of maturity.   For instance, we put all the
>>>>> jars on the main classpath so we've never run into the issue your pull
>>>>> request addresses.
>>>>>
>>>>> We definitely use and rely on Spark on a professional basis.  We have
>>>>> 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of
>>>>> data.   Once we got them working with the proper configuration settings,
>>>>> they have been running reliability since.
>>>>>
>>>>> I would characterize our use of Spark as a "better Hadoop", in the
>>>>> sense that we use it for batch processing only, no streaming yet.   We're
>>>>> happy it performs better than Hadoop but we don't require/rely on its
>>>>> memory caching features.  In fact, for most of our jobs it would simplify
>>>>> our lives if Spark wouldn't cache so many things in memory since it would
>>>>> make configuration/tuning a lot simpler and jobs would run successfully on
>>>>> the first try instead of having to tweak things (# of partitions and such).
>>>>>
>>>>> So, to the concrete issues. Sorry for the long mail, and let me know
>>>>>> if I should break this out into more threads or if there is some other way
>>>>>> to have this discussion...
>>>>>>
>>>>>> 1. Memory management
>>>>>> The general direction of these questions is whether it's possible to
>>>>>> take RDD caching related memory management more into our own hands as LRU
>>>>>> eviction is nice most of the time but can be very suboptimal in some of our
>>>>>> use cases.
>>>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that
>>>>>> one really wants to keep. I'm fine with going down in flames if I mark too
>>>>>> much data essential.
>>>>>> B. Memory "reflection": can you pragmatically get the memory size of
>>>>>> a cached rdd and memory sizes available in total/per executor? If we could
>>>>>> do this we could indirectly avoid automatic evictions of things we might
>>>>>> really want to keep in memory.
>>>>>> C. Evictions caused by RDD partitions on the driver. I had a setup
>>>>>> with huge worker memory and smallish memory on the driver JVM. To my
>>>>>> surprise, the system started to cache RDD partitions on the driver as well.
>>>>>> As the driver ran out of memory I started to see evictions while there were
>>>>>> still plenty of space on workers. This resulted in lengthy recomputations.
>>>>>> Can this be avoided somehow?
>>>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>>>> size of a broadcast programmatically?
>>>>>>
>>>>>>
>>>>>> 2. Akka lost connections
>>>>>> We have quite often experienced lost executors due to akka exceptions
>>>>>> - mostly connection lost or similar. It seems to happen when an executor
>>>>>> gets extremely busy with some CPU intensive work. Our hypothesis is that
>>>>>> akka network threads get starved and the executor fails to respond within
>>>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>>>
>>>>>
>>>>> We've seen these as well.  In our case, increasing the akka timeouts
>>>>> and framesize helped a lot.
>>>>>
>>>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>>>
>>>>>
>>>>>>
>>>>>> In general, these are scary errors in the sense that they come from
>>>>>> the very core of the framework and it's hard to link it to something we do
>>>>>> in our own code, and thus hard to find a fix. So a question more for the
>>>>>> community: how often do you end up scratching your head about cases where
>>>>>> spark
>>>>>>
>>>>> magic doesn't work perfectly?
>>>>>>
>>>>>
>>>>> For us, this happens most often for jobs processing TBs of data
>>>>> (instead of GBs)... which is frustrating of course because these jobs cost
>>>>> a lot more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>>>
>>>>> It means we have to comb the logs to understand what happened,
>>>>> interpret stack traces, dump memory / object allocations, read Spark source
>>>>> to formulate hypothesis about what went wrong and then trial + error to get
>>>>> to a configuration that works.   Again, if Spark had better defaults and
>>>>> more conservative execution model (rely less on in-memory caching of RDDs
>>>>> and associated metadata, keepings large communication buffers on the heap,
>>>>> etc.), it would definitely simplify our lives.
>>>>>
>>>>> (Though I recognize that others might use Spark very differently and
>>>>> that these defaults and conservative behavior might not please everybody.)
>>>>>
>>>>> Hopefully this is the kind of feedback you were looking for...
>>>>>
>>>>>
>>>>>> 3. Recalculation of cached rdds
>>>>>> I see the following scenario happening. I load two RDDs A,B from
>>>>>> disk, cache them and then do some jobs on them, at the very least a count
>>>>>> on each. After these jobs are done I see on the storage panel that 100% of
>>>>>> these RDDs are cached in memory.
>>>>>>
>>>>>> Then I create a third RDD C which is created by multiple joins and
>>>>>> maps from A and B, also cache it and start a job on C. When I do this I
>>>>>> still see A and B completely cached and also see C slowly getting more and
>>>>>> more cached. This is all fine and good, but in the meanwhile I see stages
>>>>>> running on the UI that point to code which is used to load A and B. How is
>>>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>>>
>>>>>> And again the general question - how can one debug such issues?
>>>>>>
>>>>>> 4. Shuffle on disk
>>>>>> Is it true - I couldn't find it in official docs, but did see this
>>>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>>>> this to be impossible?
>>>>>>
>>>>>>
>>>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>>>
>>>>>> Andras
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark - ready for prime time?

Posted by Andrew Or <an...@databricks.com>.
Here are answers to a subset of your questions:

> 1. Memory management
> The general direction of these questions is whether it's possible to take
RDD caching related memory management more into our own hands as LRU
eviction is nice most of the time but can be very suboptimal in some of our
use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
really wants to keep. I'm fine with going down in flames if I mark too much
data

As far as I am aware, there is currently no other eviction policies for RDD
blocks other than LRU. Your suggestion of prioritizing RDDs is an
interesting one and I'm sure other users would like that as well.

> B. Memory "reflection": can you pragmatically get the memory size of a
cached rdd and memory sizes available in total/per executor? If we could do
this we could indirectly avoid automatic evictions of things we might
really want to keep in memory.

All this information should be displayed on the UI under the Storage tab.

> C. Evictions caused by RDD partitions on the driver. I had a setup with
huge worker memory and smallish memory on the driver JVM. To my surprise,
the system started to cache RDD partitions on the driver as well. As the
driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
this be avoided somehow?

The amount of space used for RDD storage is only a fraction of the total
amount of memory available to the JVM. More specifically, it is governed by
`spark.storage.memoryFraction`, which is by default 60%. This may explain
why evictions seem to occur pre-maturely sometimes. In the future, we
should probably add a table that contains information about evicted RDDs on
the UI, so it's easier to track them. Right now evicted RDD's disappear
from the face of the planet completely, sometimes leaving the user somewhat
confounded. Though with off-heap storage (Tachyon) this may become less
relevant.

> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
waiting for the LRU eviction taking care of it? Can you tell the size of a
broadcast programmatically?

In Spark 1.0, the mechanism to unpersist blocks used by a broadcast is
explicitly added! Under the storage tab of the UI, we could probably also
have a Broadcast table in the future, seeing that there are users
interested in this feature.

> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
cache them and then do some jobs on them, at the very least a count on
each. After these jobs are done I see on the storage panel that 100% of
these RDDs are cached in memory.
> Then I create a third RDD C which is created by multiple joins and maps
from A and B, also cache it and start a job on C. When I do this I still
see A and B completely cached and also see C slowly getting more and more
cached. This is all fine and good, but in the meanwhile I see stages
running on the UI that point to code which is used to load A and B. How is
this possible? Am I misunderstanding how cached RDDs should behave?
> And again the general question - how can one debug such issues?

>From the fractions of RDDs cached in memory, it seems to me that your
application is running as expected. If you also cache C, then it will
slowly add more blocks to storage, possibly evicting A and B if there is
memory pressure. It's entirely possible that there is a bug on finding the
call site on the stages page (there were a few PRs that made changes to
this recently).

4. Shuffle on disk
Is it true - I couldn't find it in official docs, but did see this
mentioned in various threads - that shuffle _always_ hits disk?
(Disregarding OS caches.) Why is this the case? Are you planning to add a
function to do shuffle in memory or are there some intrinsic reasons for
this to be impossible?

I don't think it's true... as far as I'm concerned Spark doesn't peek into
the OS and force it to disregard buffer caches. In general, for large
shuffles, all shuffle files do not fit into memory, so we kind of have to
write them out to disk. There is an undocumented option to sync writing
shuffle files to disk every time we write a block, but that is by default
false and not many people use it (for obvious reasons).



On Thu, Apr 10, 2014 at 12:05 PM, Roger Hoover <ro...@gmail.com>wrote:

> Can anyone comment on their experience running Spark Streaming in
> production?
>
>
> On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com>wrote:
>
>>
>>
>>
>> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
>>
>>> The biggest issue I've come across is that the cluster is somewhat
>>> unstable when under memory pressure.  Meaning that if you attempt to
>>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>>> often still get OOMs.  I had to carefully modify some of the space tuning
>>> parameters and GC settings to get some jobs to even finish.
>>>
>>> The other issue I've observed is if you group on a key that is highly
>>> skewed, with a few massively-common keys and a long tail of rare keys, the
>>> one massive key can be too big for a single machine and again cause OOMs.
>>>
>>
>> My take on it -- Spark doesn't believe in sort-and-spill things to enable
>> super long groups, and IMO for a good reason. Here are my thoughts:
>>
>> (1) in my work i don't need "sort" in 99% of the cases, i only need
>> "group" which absolutely doesn't need the spill which makes things slow
>> down to a crawl.
>> (2) if that's an aggregate (such as group count), use combine(), not
>> groupByKey -- this will do tons of good on memory use.
>> (3) if you really need groups that don't fit into memory, that is always
>> because you want to do something that is other than aggregation, with them.
>> E,g build an index of that grouped data. we actually had a case just like
>> that. In this case your friend is really not groupBy, but rather
>> PartitionBy. I.e. what happens there you build a quick count sketch,
>> perhaps on downsampled data, to figure which keys have sufficiently "big"
>> count -- and then you build a partitioner that redirects large groups to a
>> dedicated map(). assuming this map doesn't try to load things in memory but
>> rather do something like streaming BTree build, that should be fine. In
>> certain cituations such processing may require splitting super large group
>> even into smaller sub groups (e.g. partitioned BTree structure), at which
>> point you should be fine even from uniform load point of view. It takes a
>> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
>> not promise do this all for you in the groupBy contract.
>>
>>
>>
>>>
>>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>>> issues.
>>>
>>> Just my personal experience, but I've observed significant improvements
>>> in stability since even the 0.7.x days, so I'm confident that things will
>>> continue to get better as long as people report what they're seeing so it
>>> can get fixed.
>>>
>>> Andrew
>>>
>>>
>>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>wrote:
>>>
>>>> I'll provide answers from our own experience at Bizo.  We've been using
>>>> Spark for 1+ year now and have found it generally better than previous
>>>> approaches (Hadoop + Hive mostly).
>>>>
>>>>
>>>>
>>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>>> andras.nemeth@lynxanalytics.com> wrote:
>>>>
>>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>>> it's extremely convenient and efficient when it indeed works. But should we
>>>>> be worried that customization is hard if the built in behavior is not quite
>>>>> right for us? Are we to expect hard to track down issues originating from
>>>>> the black box behind the magic?
>>>>>
>>>>
>>>> I think is goes back to understanding Spark's architecture, its design
>>>> constraints and the problems it explicitly set out to address.   If the
>>>> solution to your problems can be easily formulated in terms of the
>>>> map/reduce model, then it's a good choice.  You'll want your
>>>> "customizations" to go with (not against) the grain of the architecture.
>>>>
>>>>
>>>>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>>>>> before. So that's why I'm asking: is Spark being already used in
>>>>> professional settings? Can one already trust it being reasonably bug free
>>>>> and reliable?
>>>>>
>>>>
>>>> There are lots of ways to use Spark; and not all of the features are
>>>> necessarily at the same level of maturity.   For instance, we put all the
>>>> jars on the main classpath so we've never run into the issue your pull
>>>> request addresses.
>>>>
>>>> We definitely use and rely on Spark on a professional basis.  We have
>>>> 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of
>>>> data.   Once we got them working with the proper configuration settings,
>>>> they have been running reliability since.
>>>>
>>>> I would characterize our use of Spark as a "better Hadoop", in the
>>>> sense that we use it for batch processing only, no streaming yet.   We're
>>>> happy it performs better than Hadoop but we don't require/rely on its
>>>> memory caching features.  In fact, for most of our jobs it would simplify
>>>> our lives if Spark wouldn't cache so many things in memory since it would
>>>> make configuration/tuning a lot simpler and jobs would run successfully on
>>>> the first try instead of having to tweak things (# of partitions and such).
>>>>
>>>> So, to the concrete issues. Sorry for the long mail, and let me know if
>>>>> I should break this out into more threads or if there is some other way to
>>>>> have this discussion...
>>>>>
>>>>> 1. Memory management
>>>>> The general direction of these questions is whether it's possible to
>>>>> take RDD caching related memory management more into our own hands as LRU
>>>>> eviction is nice most of the time but can be very suboptimal in some of our
>>>>> use cases.
>>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>>>> data essential.
>>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>>> cached rdd and memory sizes available in total/per executor? If we could do
>>>>> this we could indirectly avoid automatic evictions of things we might
>>>>> really want to keep in memory.
>>>>> C. Evictions caused by RDD partitions on the driver. I had a setup
>>>>> with huge worker memory and smallish memory on the driver JVM. To my
>>>>> surprise, the system started to cache RDD partitions on the driver as well.
>>>>> As the driver ran out of memory I started to see evictions while there were
>>>>> still plenty of space on workers. This resulted in lengthy recomputations.
>>>>> Can this be avoided somehow?
>>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>>> size of a broadcast programmatically?
>>>>>
>>>>>
>>>>> 2. Akka lost connections
>>>>> We have quite often experienced lost executors due to akka exceptions
>>>>> - mostly connection lost or similar. It seems to happen when an executor
>>>>> gets extremely busy with some CPU intensive work. Our hypothesis is that
>>>>> akka network threads get starved and the executor fails to respond within
>>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>>
>>>>
>>>> We've seen these as well.  In our case, increasing the akka timeouts
>>>> and framesize helped a lot.
>>>>
>>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>>
>>>>
>>>>>
>>>>> In general, these are scary errors in the sense that they come from
>>>>> the very core of the framework and it's hard to link it to something we do
>>>>> in our own code, and thus hard to find a fix. So a question more for the
>>>>> community: how often do you end up scratching your head about cases where
>>>>> spark
>>>>>
>>>> magic doesn't work perfectly?
>>>>>
>>>>
>>>> For us, this happens most often for jobs processing TBs of data
>>>> (instead of GBs)... which is frustrating of course because these jobs cost
>>>> a lot more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>>
>>>> It means we have to comb the logs to understand what happened,
>>>> interpret stack traces, dump memory / object allocations, read Spark source
>>>> to formulate hypothesis about what went wrong and then trial + error to get
>>>> to a configuration that works.   Again, if Spark had better defaults and
>>>> more conservative execution model (rely less on in-memory caching of RDDs
>>>> and associated metadata, keepings large communication buffers on the heap,
>>>> etc.), it would definitely simplify our lives.
>>>>
>>>> (Though I recognize that others might use Spark very differently and
>>>> that these defaults and conservative behavior might not please everybody.)
>>>>
>>>> Hopefully this is the kind of feedback you were looking for...
>>>>
>>>>
>>>>> 3. Recalculation of cached rdds
>>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>>> cache them and then do some jobs on them, at the very least a count on
>>>>> each. After these jobs are done I see on the storage panel that 100% of
>>>>> these RDDs are cached in memory.
>>>>>
>>>>> Then I create a third RDD C which is created by multiple joins and
>>>>> maps from A and B, also cache it and start a job on C. When I do this I
>>>>> still see A and B completely cached and also see C slowly getting more and
>>>>> more cached. This is all fine and good, but in the meanwhile I see stages
>>>>> running on the UI that point to code which is used to load A and B. How is
>>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>>
>>>>> And again the general question - how can one debug such issues?
>>>>>
>>>>> 4. Shuffle on disk
>>>>> Is it true - I couldn't find it in official docs, but did see this
>>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>>> this to be impossible?
>>>>>
>>>>>
>>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>>
>>>>> Andras
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Spark - ready for prime time?

Posted by Roger Hoover <ro...@gmail.com>.
Can anyone comment on their experience running Spark Streaming in
production?


On Thu, Apr 10, 2014 at 10:33 AM, Dmitriy Lyubimov <dl...@gmail.com>wrote:

>
>
>
> On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:
>
>> The biggest issue I've come across is that the cluster is somewhat
>> unstable when under memory pressure.  Meaning that if you attempt to
>> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
>> often still get OOMs.  I had to carefully modify some of the space tuning
>> parameters and GC settings to get some jobs to even finish.
>>
>> The other issue I've observed is if you group on a key that is highly
>> skewed, with a few massively-common keys and a long tail of rare keys, the
>> one massive key can be too big for a single machine and again cause OOMs.
>>
>
> My take on it -- Spark doesn't believe in sort-and-spill things to enable
> super long groups, and IMO for a good reason. Here are my thoughts:
>
> (1) in my work i don't need "sort" in 99% of the cases, i only need
> "group" which absolutely doesn't need the spill which makes things slow
> down to a crawl.
> (2) if that's an aggregate (such as group count), use combine(), not
> groupByKey -- this will do tons of good on memory use.
> (3) if you really need groups that don't fit into memory, that is always
> because you want to do something that is other than aggregation, with them.
> E,g build an index of that grouped data. we actually had a case just like
> that. In this case your friend is really not groupBy, but rather
> PartitionBy. I.e. what happens there you build a quick count sketch,
> perhaps on downsampled data, to figure which keys have sufficiently "big"
> count -- and then you build a partitioner that redirects large groups to a
> dedicated map(). assuming this map doesn't try to load things in memory but
> rather do something like streaming BTree build, that should be fine. In
> certain cituations such processing may require splitting super large group
> even into smaller sub groups (e.g. partitioned BTree structure), at which
> point you should be fine even from uniform load point of view. It takes a
> little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
> not promise do this all for you in the groupBy contract.
>
>
>
>>
>> I'm hopeful that off-heap caching (Tachyon) could fix some of these
>> issues.
>>
>> Just my personal experience, but I've observed significant improvements
>> in stability since even the 0.7.x days, so I'm confident that things will
>> continue to get better as long as people report what they're seeing so it
>> can get fixed.
>>
>> Andrew
>>
>>
>> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>wrote:
>>
>>> I'll provide answers from our own experience at Bizo.  We've been using
>>> Spark for 1+ year now and have found it generally better than previous
>>> approaches (Hadoop + Hive mostly).
>>>
>>>
>>>
>>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>>> andras.nemeth@lynxanalytics.com> wrote:
>>>
>>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>>> it's extremely convenient and efficient when it indeed works. But should we
>>>> be worried that customization is hard if the built in behavior is not quite
>>>> right for us? Are we to expect hard to track down issues originating from
>>>> the black box behind the magic?
>>>>
>>>
>>> I think is goes back to understanding Spark's architecture, its design
>>> constraints and the problems it explicitly set out to address.   If the
>>> solution to your problems can be easily formulated in terms of the
>>> map/reduce model, then it's a good choice.  You'll want your
>>> "customizations" to go with (not against) the grain of the architecture.
>>>
>>>
>>>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>>>> before. So that's why I'm asking: is Spark being already used in
>>>> professional settings? Can one already trust it being reasonably bug free
>>>> and reliable?
>>>>
>>>
>>> There are lots of ways to use Spark; and not all of the features are
>>> necessarily at the same level of maturity.   For instance, we put all the
>>> jars on the main classpath so we've never run into the issue your pull
>>> request addresses.
>>>
>>> We definitely use and rely on Spark on a professional basis.  We have 5+
>>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>>> Once we got them working with the proper configuration settings, they have
>>> been running reliability since.
>>>
>>> I would characterize our use of Spark as a "better Hadoop", in the sense
>>> that we use it for batch processing only, no streaming yet.   We're happy
>>> it performs better than Hadoop but we don't require/rely on its memory
>>> caching features.  In fact, for most of our jobs it would simplify our
>>> lives if Spark wouldn't cache so many things in memory since it would make
>>> configuration/tuning a lot simpler and jobs would run successfully on the
>>> first try instead of having to tweak things (# of partitions and such).
>>>
>>> So, to the concrete issues. Sorry for the long mail, and let me know if
>>>> I should break this out into more threads or if there is some other way to
>>>> have this discussion...
>>>>
>>>> 1. Memory management
>>>> The general direction of these questions is whether it's possible to
>>>> take RDD caching related memory management more into our own hands as LRU
>>>> eviction is nice most of the time but can be very suboptimal in some of our
>>>> use cases.
>>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>>> data essential.
>>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>>> cached rdd and memory sizes available in total/per executor? If we could do
>>>> this we could indirectly avoid automatic evictions of things we might
>>>> really want to keep in memory.
>>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>>> the system started to cache RDD partitions on the driver as well. As the
>>>> driver ran out of memory I started to see evictions while there were still
>>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>>> this be avoided somehow?
>>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>>> size of a broadcast programmatically?
>>>>
>>>>
>>>> 2. Akka lost connections
>>>> We have quite often experienced lost executors due to akka exceptions -
>>>> mostly connection lost or similar. It seems to happen when an executor gets
>>>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>>>> network threads get starved and the executor fails to respond within
>>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>>
>>>
>>> We've seen these as well.  In our case, increasing the akka timeouts and
>>> framesize helped a lot.
>>>
>>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>>
>>>
>>>>
>>>> In general, these are scary errors in the sense that they come from the
>>>> very core of the framework and it's hard to link it to something we do in
>>>> our own code, and thus hard to find a fix. So a question more for the
>>>> community: how often do you end up scratching your head about cases where
>>>> spark
>>>>
>>> magic doesn't work perfectly?
>>>>
>>>
>>> For us, this happens most often for jobs processing TBs of data (instead
>>> of GBs)... which is frustrating of course because these jobs cost a lot
>>> more in $$$ + time to run/debug/diagnose than smaller jobs.
>>>
>>> It means we have to comb the logs to understand what happened, interpret
>>> stack traces, dump memory / object allocations, read Spark source to
>>> formulate hypothesis about what went wrong and then trial + error to get to
>>> a configuration that works.   Again, if Spark had better defaults and more
>>> conservative execution model (rely less on in-memory caching of RDDs and
>>> associated metadata, keepings large communication buffers on the heap,
>>> etc.), it would definitely simplify our lives.
>>>
>>> (Though I recognize that others might use Spark very differently and
>>> that these defaults and conservative behavior might not please everybody.)
>>>
>>> Hopefully this is the kind of feedback you were looking for...
>>>
>>>
>>>> 3. Recalculation of cached rdds
>>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>>> cache them and then do some jobs on them, at the very least a count on
>>>> each. After these jobs are done I see on the storage panel that 100% of
>>>> these RDDs are cached in memory.
>>>>
>>>> Then I create a third RDD C which is created by multiple joins and maps
>>>> from A and B, also cache it and start a job on C. When I do this I still
>>>> see A and B completely cached and also see C slowly getting more and more
>>>> cached. This is all fine and good, but in the meanwhile I see stages
>>>> running on the UI that point to code which is used to load A and B. How is
>>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>>
>>>> And again the general question - how can one debug such issues?
>>>>
>>>> 4. Shuffle on disk
>>>> Is it true - I couldn't find it in official docs, but did see this
>>>> mentioned in various threads - that shuffle _always_ hits disk?
>>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>>> function to do shuffle in memory or are there some intrinsic reasons for
>>>> this to be impossible?
>>>>
>>>>
>>>> Sorry again for the giant mail, and thanks for any insights!
>>>>
>>>> Andras
>>>>
>>>>
>>>>
>>>
>>
>

Re: Spark - ready for prime time?

Posted by Dmitriy Lyubimov <dl...@gmail.com>.
On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash <an...@andrewash.com> wrote:

> The biggest issue I've come across is that the cluster is somewhat
> unstable when under memory pressure.  Meaning that if you attempt to
> persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
> often still get OOMs.  I had to carefully modify some of the space tuning
> parameters and GC settings to get some jobs to even finish.
>
> The other issue I've observed is if you group on a key that is highly
> skewed, with a few massively-common keys and a long tail of rare keys, the
> one massive key can be too big for a single machine and again cause OOMs.
>

My take on it -- Spark doesn't believe in sort-and-spill things to enable
super long groups, and IMO for a good reason. Here are my thoughts:

(1) in my work i don't need "sort" in 99% of the cases, i only need "group"
which absolutely doesn't need the spill which makes things slow down to a
crawl.
(2) if that's an aggregate (such as group count), use combine(), not
groupByKey -- this will do tons of good on memory use.
(3) if you really need groups that don't fit into memory, that is always
because you want to do something that is other than aggregation, with them.
E,g build an index of that grouped data. we actually had a case just like
that. In this case your friend is really not groupBy, but rather
PartitionBy. I.e. what happens there you build a quick count sketch,
perhaps on downsampled data, to figure which keys have sufficiently "big"
count -- and then you build a partitioner that redirects large groups to a
dedicated map(). assuming this map doesn't try to load things in memory but
rather do something like streaming BTree build, that should be fine. In
certain cituations such processing may require splitting super large group
even into smaller sub groups (e.g. partitioned BTree structure), at which
point you should be fine even from uniform load point of view. It takes a
little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
not promise do this all for you in the groupBy contract.



>
> I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.
>
> Just my personal experience, but I've observed significant improvements in
> stability since even the 0.7.x days, so I'm confident that things will
> continue to get better as long as people report what they're seeing so it
> can get fixed.
>
> Andrew
>
>
> On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>wrote:
>
>> I'll provide answers from our own experience at Bizo.  We've been using
>> Spark for 1+ year now and have found it generally better than previous
>> approaches (Hadoop + Hive mostly).
>>
>>
>>
>> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
>> andras.nemeth@lynxanalytics.com> wrote:
>>
>>> I. Is it too much magic? Lots of things "just work right" in Spark and
>>> it's extremely convenient and efficient when it indeed works. But should we
>>> be worried that customization is hard if the built in behavior is not quite
>>> right for us? Are we to expect hard to track down issues originating from
>>> the black box behind the magic?
>>>
>>
>> I think is goes back to understanding Spark's architecture, its design
>> constraints and the problems it explicitly set out to address.   If the
>> solution to your problems can be easily formulated in terms of the
>> map/reduce model, then it's a good choice.  You'll want your
>> "customizations" to go with (not against) the grain of the architecture.
>>
>>
>>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>>> before. So that's why I'm asking: is Spark being already used in
>>> professional settings? Can one already trust it being reasonably bug free
>>> and reliable?
>>>
>>
>> There are lots of ways to use Spark; and not all of the features are
>> necessarily at the same level of maturity.   For instance, we put all the
>> jars on the main classpath so we've never run into the issue your pull
>> request addresses.
>>
>> We definitely use and rely on Spark on a professional basis.  We have 5+
>> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
>> Once we got them working with the proper configuration settings, they have
>> been running reliability since.
>>
>> I would characterize our use of Spark as a "better Hadoop", in the sense
>> that we use it for batch processing only, no streaming yet.   We're happy
>> it performs better than Hadoop but we don't require/rely on its memory
>> caching features.  In fact, for most of our jobs it would simplify our
>> lives if Spark wouldn't cache so many things in memory since it would make
>> configuration/tuning a lot simpler and jobs would run successfully on the
>> first try instead of having to tweak things (# of partitions and such).
>>
>> So, to the concrete issues. Sorry for the long mail, and let me know if I
>>> should break this out into more threads or if there is some other way to
>>> have this discussion...
>>>
>>> 1. Memory management
>>> The general direction of these questions is whether it's possible to
>>> take RDD caching related memory management more into our own hands as LRU
>>> eviction is nice most of the time but can be very suboptimal in some of our
>>> use cases.
>>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>>> really wants to keep. I'm fine with going down in flames if I mark too much
>>> data essential.
>>> B. Memory "reflection": can you pragmatically get the memory size of a
>>> cached rdd and memory sizes available in total/per executor? If we could do
>>> this we could indirectly avoid automatic evictions of things we might
>>> really want to keep in memory.
>>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>>> the system started to cache RDD partitions on the driver as well. As the
>>> driver ran out of memory I started to see evictions while there were still
>>> plenty of space on workers. This resulted in lengthy recomputations. Can
>>> this be avoided somehow?
>>> D. Broadcasts. Is it possible to get rid of a broadcast manually,
>>> without waiting for the LRU eviction taking care of it? Can you tell the
>>> size of a broadcast programmatically?
>>>
>>>
>>> 2. Akka lost connections
>>> We have quite often experienced lost executors due to akka exceptions -
>>> mostly connection lost or similar. It seems to happen when an executor gets
>>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>>> network threads get starved and the executor fails to respond within
>>> timeout limits. Is this plausible? If yes, what can we do with it?
>>>
>>
>> We've seen these as well.  In our case, increasing the akka timeouts and
>> framesize helped a lot.
>>
>> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>>
>>
>>>
>>> In general, these are scary errors in the sense that they come from the
>>> very core of the framework and it's hard to link it to something we do in
>>> our own code, and thus hard to find a fix. So a question more for the
>>> community: how often do you end up scratching your head about cases where
>>> spark
>>>
>> magic doesn't work perfectly?
>>>
>>
>> For us, this happens most often for jobs processing TBs of data (instead
>> of GBs)... which is frustrating of course because these jobs cost a lot
>> more in $$$ + time to run/debug/diagnose than smaller jobs.
>>
>> It means we have to comb the logs to understand what happened, interpret
>> stack traces, dump memory / object allocations, read Spark source to
>> formulate hypothesis about what went wrong and then trial + error to get to
>> a configuration that works.   Again, if Spark had better defaults and more
>> conservative execution model (rely less on in-memory caching of RDDs and
>> associated metadata, keepings large communication buffers on the heap,
>> etc.), it would definitely simplify our lives.
>>
>> (Though I recognize that others might use Spark very differently and that
>> these defaults and conservative behavior might not please everybody.)
>>
>> Hopefully this is the kind of feedback you were looking for...
>>
>>
>>> 3. Recalculation of cached rdds
>>> I see the following scenario happening. I load two RDDs A,B from disk,
>>> cache them and then do some jobs on them, at the very least a count on
>>> each. After these jobs are done I see on the storage panel that 100% of
>>> these RDDs are cached in memory.
>>>
>>> Then I create a third RDD C which is created by multiple joins and maps
>>> from A and B, also cache it and start a job on C. When I do this I still
>>> see A and B completely cached and also see C slowly getting more and more
>>> cached. This is all fine and good, but in the meanwhile I see stages
>>> running on the UI that point to code which is used to load A and B. How is
>>> this possible? Am I misunderstanding how cached RDDs should behave?
>>>
>>> And again the general question - how can one debug such issues?
>>>
>>> 4. Shuffle on disk
>>> Is it true - I couldn't find it in official docs, but did see this
>>> mentioned in various threads - that shuffle _always_ hits disk?
>>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>>> function to do shuffle in memory or are there some intrinsic reasons for
>>> this to be impossible?
>>>
>>>
>>> Sorry again for the giant mail, and thanks for any insights!
>>>
>>> Andras
>>>
>>>
>>>
>>
>

Re: Spark - ready for prime time?

Posted by Andrew Ash <an...@andrewash.com>.
It's highly dependent on what the issue is with your particular job, but
the ones I modify most commonly are:

spark.storage.memoryFraction
spark.shuffle.memoryFraction
parallelism (a parameter on many RDD calls) -- increase from the default
level to get more, smaller tasks that are more likely to finish
Use Kryo

A while back I also modified:
spark.storage.blockManagerTimeoutIntervalMs -- when a stop-the-world GC on
a slave caused heartbeat timeouts but the slave would eventually recover, I
would bump up this parameter from default (this was 0.7.x)

It's also been a while since I messed with GC tuning for Spark, but I'd
generally recommend capping your JVM size at about 31.5 GB so you can keep
compressed pointers.  Better to run multiple JVMs at that size than a
single that's 128GB for example.  I think the general practice for GC
tuning is to have your interactive-time JVMs (like the driver or the
master) run with the concurrent mark and sweep collector, and the bulk
computation JVMs (like a worker or executor) run with parallel GC.  Not
sure how the newer G1 collector fits in to those.  And most of the time if
you're messing with GC parameters, your issues are actually at the Spark
level and you should be spending your time figuring out why that's causing
problems instead.

Another thing I did was if a job wouldn't finish but consisted of several
steps, I could manually save each step along the way to disk (HDFS) and
load from there.  A lot of my jobs only need to finish once, so as long as
I get it done (even if it's a more manual process than it should be) is ok.

Hope that helps!
Andrew



On Sun, Apr 13, 2014 at 4:33 PM, Jim Blomo <ji...@gmail.com> wrote:

> On Thu, Apr 10, 2014 at 12:24 PM, Andrew Ash <an...@andrewash.com> wrote:
> > The biggest issue I've come across is that the cluster is somewhat
> unstable
> > when under memory pressure.  Meaning that if you attempt to persist an
> RDD
> > that's too big for memory, even with MEMORY_AND_DISK, you'll often still
> get
> > OOMs.  I had to carefully modify some of the space tuning parameters and
> GC
> > settings to get some jobs to even finish.
>
> Would you mind sharing some of these settings?  Even just a GitHub
> gist would be helpful.  These are the main issues I've run into as
> well, and memory pressure also seems to be correlated with akka
> timeouts, possibly because of GC pauses.
>

Re: Spark - ready for prime time?

Posted by Jim Blomo <ji...@gmail.com>.
On Thu, Apr 10, 2014 at 12:24 PM, Andrew Ash <an...@andrewash.com> wrote:
> The biggest issue I've come across is that the cluster is somewhat unstable
> when under memory pressure.  Meaning that if you attempt to persist an RDD
> that's too big for memory, even with MEMORY_AND_DISK, you'll often still get
> OOMs.  I had to carefully modify some of the space tuning parameters and GC
> settings to get some jobs to even finish.

Would you mind sharing some of these settings?  Even just a GitHub
gist would be helpful.  These are the main issues I've run into as
well, and memory pressure also seems to be correlated with akka
timeouts, possibly because of GC pauses.

Re: Spark - ready for prime time?

Posted by Andrew Ash <an...@andrewash.com>.
The biggest issue I've come across is that the cluster is somewhat unstable
when under memory pressure.  Meaning that if you attempt to persist an RDD
that's too big for memory, even with MEMORY_AND_DISK, you'll often still
get OOMs.  I had to carefully modify some of the space tuning parameters
and GC settings to get some jobs to even finish.

The other issue I've observed is if you group on a key that is highly
skewed, with a few massively-common keys and a long tail of rare keys, the
one massive key can be too big for a single machine and again cause OOMs.

I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.

Just my personal experience, but I've observed significant improvements in
stability since even the 0.7.x days, so I'm confident that things will
continue to get better as long as people report what they're seeing so it
can get fixed.

Andrew


On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert <al...@gmail.com>wrote:

> I'll provide answers from our own experience at Bizo.  We've been using
> Spark for 1+ year now and have found it generally better than previous
> approaches (Hadoop + Hive mostly).
>
>
>
> On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
> andras.nemeth@lynxanalytics.com> wrote:
>
>> I. Is it too much magic? Lots of things "just work right" in Spark and
>> it's extremely convenient and efficient when it indeed works. But should we
>> be worried that customization is hard if the built in behavior is not quite
>> right for us? Are we to expect hard to track down issues originating from
>> the black box behind the magic?
>>
>
> I think is goes back to understanding Spark's architecture, its design
> constraints and the problems it explicitly set out to address.   If the
> solution to your problems can be easily formulated in terms of the
> map/reduce model, then it's a good choice.  You'll want your
> "customizations" to go with (not against) the grain of the architecture.
>
>
>> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
>> before. So that's why I'm asking: is Spark being already used in
>> professional settings? Can one already trust it being reasonably bug free
>> and reliable?
>>
>
> There are lots of ways to use Spark; and not all of the features are
> necessarily at the same level of maturity.   For instance, we put all the
> jars on the main classpath so we've never run into the issue your pull
> request addresses.
>
> We definitely use and rely on Spark on a professional basis.  We have 5+
> spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
> Once we got them working with the proper configuration settings, they have
> been running reliability since.
>
> I would characterize our use of Spark as a "better Hadoop", in the sense
> that we use it for batch processing only, no streaming yet.   We're happy
> it performs better than Hadoop but we don't require/rely on its memory
> caching features.  In fact, for most of our jobs it would simplify our
> lives if Spark wouldn't cache so many things in memory since it would make
> configuration/tuning a lot simpler and jobs would run successfully on the
> first try instead of having to tweak things (# of partitions and such).
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
>> should break this out into more threads or if there is some other way to
>> have this discussion...
>>
>> 1. Memory management
>> The general direction of these questions is whether it's possible to take
>> RDD caching related memory management more into our own hands as LRU
>> eviction is nice most of the time but can be very suboptimal in some of our
>> use cases.
>> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
>> really wants to keep. I'm fine with going down in flames if I mark too much
>> data essential.
>> B. Memory "reflection": can you pragmatically get the memory size of a
>> cached rdd and memory sizes available in total/per executor? If we could do
>> this we could indirectly avoid automatic evictions of things we might
>> really want to keep in memory.
>> C. Evictions caused by RDD partitions on the driver. I had a setup with
>> huge worker memory and smallish memory on the driver JVM. To my surprise,
>> the system started to cache RDD partitions on the driver as well. As the
>> driver ran out of memory I started to see evictions while there were still
>> plenty of space on workers. This resulted in lengthy recomputations. Can
>> this be avoided somehow?
>> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
>> waiting for the LRU eviction taking care of it? Can you tell the size of a
>> broadcast programmatically?
>>
>>
>> 2. Akka lost connections
>> We have quite often experienced lost executors due to akka exceptions -
>> mostly connection lost or similar. It seems to happen when an executor gets
>> extremely busy with some CPU intensive work. Our hypothesis is that akka
>> network threads get starved and the executor fails to respond within
>> timeout limits. Is this plausible? If yes, what can we do with it?
>>
>
> We've seen these as well.  In our case, increasing the akka timeouts and
> framesize helped a lot.
>
> e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}
>
>
>>
>> In general, these are scary errors in the sense that they come from the
>> very core of the framework and it's hard to link it to something we do in
>> our own code, and thus hard to find a fix. So a question more for the
>> community: how often do you end up scratching your head about cases where
>> spark
>>
> magic doesn't work perfectly?
>>
>
> For us, this happens most often for jobs processing TBs of data (instead
> of GBs)... which is frustrating of course because these jobs cost a lot
> more in $$$ + time to run/debug/diagnose than smaller jobs.
>
> It means we have to comb the logs to understand what happened, interpret
> stack traces, dump memory / object allocations, read Spark source to
> formulate hypothesis about what went wrong and then trial + error to get to
> a configuration that works.   Again, if Spark had better defaults and more
> conservative execution model (rely less on in-memory caching of RDDs and
> associated metadata, keepings large communication buffers on the heap,
> etc.), it would definitely simplify our lives.
>
> (Though I recognize that others might use Spark very differently and that
> these defaults and conservative behavior might not please everybody.)
>
> Hopefully this is the kind of feedback you were looking for...
>
>
>> 3. Recalculation of cached rdds
>> I see the following scenario happening. I load two RDDs A,B from disk,
>> cache them and then do some jobs on them, at the very least a count on
>> each. After these jobs are done I see on the storage panel that 100% of
>> these RDDs are cached in memory.
>>
>> Then I create a third RDD C which is created by multiple joins and maps
>> from A and B, also cache it and start a job on C. When I do this I still
>> see A and B completely cached and also see C slowly getting more and more
>> cached. This is all fine and good, but in the meanwhile I see stages
>> running on the UI that point to code which is used to load A and B. How is
>> this possible? Am I misunderstanding how cached RDDs should behave?
>>
>> And again the general question - how can one debug such issues?
>>
>> 4. Shuffle on disk
>> Is it true - I couldn't find it in official docs, but did see this
>> mentioned in various threads - that shuffle _always_ hits disk?
>> (Disregarding OS caches.) Why is this the case? Are you planning to add a
>> function to do shuffle in memory or are there some intrinsic reasons for
>> this to be impossible?
>>
>>
>> Sorry again for the giant mail, and thanks for any insights!
>>
>> Andras
>>
>>
>>
>

Re: Spark - ready for prime time?

Posted by Alex Boisvert <al...@gmail.com>.
I'll provide answers from our own experience at Bizo.  We've been using
Spark for 1+ year now and have found it generally better than previous
approaches (Hadoop + Hive mostly).


On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
andras.nemeth@lynxanalytics.com> wrote:

> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
>

I think is goes back to understanding Spark's architecture, its design
constraints and the problems it explicitly set out to address.   If the
solution to your problems can be easily formulated in terms of the
map/reduce model, then it's a good choice.  You'll want your
"customizations" to go with (not against) the grain of the architecture.


> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>

There are lots of ways to use Spark; and not all of the features are
necessarily at the same level of maturity.   For instance, we put all the
jars on the main classpath so we've never run into the issue your pull
request addresses.

We definitely use and rely on Spark on a professional basis.  We have 5+
spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
Once we got them working with the proper configuration settings, they have
been running reliability since.

I would characterize our use of Spark as a "better Hadoop", in the sense
that we use it for batch processing only, no streaming yet.   We're happy
it performs better than Hadoop but we don't require/rely on its memory
caching features.  In fact, for most of our jobs it would simplify our
lives if Spark wouldn't cache so many things in memory since it would make
configuration/tuning a lot simpler and jobs would run successfully on the
first try instead of having to tweak things (# of partitions and such).

So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>

We've seen these as well.  In our case, increasing the akka timeouts and
framesize helped a lot.

e.g. spark.akka.{timeout, askTimeout, lookupTimeout, frameSize}


>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark
>
magic doesn't work perfectly?
>

For us, this happens most often for jobs processing TBs of data (instead of
GBs)... which is frustrating of course because these jobs cost a lot more
in $$$ + time to run/debug/diagnose than smaller jobs.

It means we have to comb the logs to understand what happened, interpret
stack traces, dump memory / object allocations, read Spark source to
formulate hypothesis about what went wrong and then trial + error to get to
a configuration that works.   Again, if Spark had better defaults and more
conservative execution model (rely less on in-memory caching of RDDs and
associated metadata, keepings large communication buffers on the heap,
etc.), it would definitely simplify our lives.

(Though I recognize that others might use Spark very differently and that
these defaults and conservative behavior might not please everybody.)

Hopefully this is the kind of feedback you were looking for...


> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> these RDDs are cached in memory.
>
> Then I create a third RDD C which is created by multiple joins and maps
> from A and B, also cache it and start a job on C. When I do this I still
> see A and B completely cached and also see C slowly getting more and more
> cached. This is all fine and good, but in the meanwhile I see stages
> running on the UI that point to code which is used to load A and B. How is
> this possible? Am I misunderstanding how cached RDDs should behave?
>
> And again the general question - how can one debug such issues?
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this
> mentioned in various threads - that shuffle _always_ hits disk?
> (Disregarding OS caches.) Why is this the case? Are you planning to add a
> function to do shuffle in memory or are there some intrinsic reasons for
> this to be impossible?
>
>
> Sorry again for the giant mail, and thanks for any insights!
>
> Andras
>
>
>

Re: Spark - ready for prime time?

Posted by Debasish Das <de...@gmail.com>.
When you say "Spark is one of the forerunners for our technology choice",
what are the other options you are looking into ?

I start cross validation runs on a 40 core, 160 GB spark job using a
script...I woke up in the morning, none of the jobs crashed ! and the
project just came out of incubation

I wish Spark keep evolving as a standalone Akka cluster (MPI cluster if you
remember C++ mpiexec :-) where you can plug and play any distributed file
system (HDFS,..,) or distributed caching systems (HBase, Cassandra,..)

I am also confident that Spark as a standalone akka cluster can serve
analytics driven scalable frontend apps....and by analytics I don't mean
sql analytics...but predictive analytics...



On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth <
andras.nemeth@lynxanalytics.com> wrote:

> Hello Spark Users,
>
> With the recent graduation of Spark to a top level project (grats, btw!),
> maybe a well timed question. :)
>
> We are at the very beginning of a large scale big data project and after
> two months of exploration work we'd like to settle on the technologies to
> use, roll up our sleeves and start to build the system.
>
> Spark is one of the forerunners for our technology choice.
>
> My question in essence is whether it's a good idea or is Spark too
> 'experimental' just yet to bet our lives (well, the project's life) on it.
>
> The benefits of choosing Spark are numerous and I guess all too obvious
> for this audience - e.g. we love its powerful abstraction, ease of
> development and the potential for using a single system for serving and
> manipulating huge amount of data.
>
> This email aims to ask about the risks. I enlist concrete issues we've
> encountered below, but basically my concern boils down to two philosophical
> points:
> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>
> I know I'm asking a biased audience, but that's fine, as I want to be
> convinced. :)
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark magic doesn't work perfectly?
>
>
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> these RDDs are cached in memory.
>
> Then I create a third RDD C which is created by multiple joins and maps
> from A and B, also cache it and start a job on C. When I do this I still
> see A and B completely cached and also see C slowly getting more and more
> cached. This is all fine and good, but in the meanwhile I see stages
> running on the UI that point to code which is used to load A and B. How is
> this possible? Am I misunderstanding how cached RDDs should behave?
>
> And again the general question - how can one debug such issues?
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this
> mentioned in various threads - that shuffle _always_ hits disk?
> (Disregarding OS caches.) Why is this the case? Are you planning to add a
> function to do shuffle in memory or are there some intrinsic reasons for
> this to be impossible?
>
>
> Sorry again for the giant mail, and thanks for any insights!
>
> Andras
>
>
>

Re: Spark - ready for prime time?

Posted by Dean Wampler <de...@gmail.com>.
Spark has been endorsed by Cloudera as the successor to MapReduce. That
says a lot...


On Thu, Apr 10, 2014 at 10:11 AM, Andras Nemeth <
andras.nemeth@lynxanalytics.com> wrote:

> Hello Spark Users,
>
> With the recent graduation of Spark to a top level project (grats, btw!),
> maybe a well timed question. :)
>
> We are at the very beginning of a large scale big data project and after
> two months of exploration work we'd like to settle on the technologies to
> use, roll up our sleeves and start to build the system.
>
> Spark is one of the forerunners for our technology choice.
>
> My question in essence is whether it's a good idea or is Spark too
> 'experimental' just yet to bet our lives (well, the project's life) on it.
>
> The benefits of choosing Spark are numerous and I guess all too obvious
> for this audience - e.g. we love its powerful abstraction, ease of
> development and the potential for using a single system for serving and
> manipulating huge amount of data.
>
> This email aims to ask about the risks. I enlist concrete issues we've
> encountered below, but basically my concern boils down to two philosophical
> points:
> I. Is it too much magic? Lots of things "just work right" in Spark and
> it's extremely convenient and efficient when it indeed works. But should we
> be worried that customization is hard if the built in behavior is not quite
> right for us? Are we to expect hard to track down issues originating from
> the black box behind the magic?
> II. Is it mature enough? E.g. we've created a pull request<https://github.com/apache/spark/pull/181>which fixes a problem that we were very surprised no one ever stumbled upon
> before. So that's why I'm asking: is Spark being already used in
> professional settings? Can one already trust it being reasonably bug free
> and reliable?
>
> I know I'm asking a biased audience, but that's fine, as I want to be
> convinced. :)
>
> So, to the concrete issues. Sorry for the long mail, and let me know if I
> should break this out into more threads or if there is some other way to
> have this discussion...
>
> 1. Memory management
> The general direction of these questions is whether it's possible to take
> RDD caching related memory management more into our own hands as LRU
> eviction is nice most of the time but can be very suboptimal in some of our
> use cases.
> A. Somehow prioritize cached RDDs, E.g. mark some "essential" that one
> really wants to keep. I'm fine with going down in flames if I mark too much
> data essential.
> B. Memory "reflection": can you pragmatically get the memory size of a
> cached rdd and memory sizes available in total/per executor? If we could do
> this we could indirectly avoid automatic evictions of things we might
> really want to keep in memory.
> C. Evictions caused by RDD partitions on the driver. I had a setup with
> huge worker memory and smallish memory on the driver JVM. To my surprise,
> the system started to cache RDD partitions on the driver as well. As the
> driver ran out of memory I started to see evictions while there were still
> plenty of space on workers. This resulted in lengthy recomputations. Can
> this be avoided somehow?
> D. Broadcasts. Is it possible to get rid of a broadcast manually, without
> waiting for the LRU eviction taking care of it? Can you tell the size of a
> broadcast programmatically?
>
>
> 2. Akka lost connections
> We have quite often experienced lost executors due to akka exceptions -
> mostly connection lost or similar. It seems to happen when an executor gets
> extremely busy with some CPU intensive work. Our hypothesis is that akka
> network threads get starved and the executor fails to respond within
> timeout limits. Is this plausible? If yes, what can we do with it?
>
> In general, these are scary errors in the sense that they come from the
> very core of the framework and it's hard to link it to something we do in
> our own code, and thus hard to find a fix. So a question more for the
> community: how often do you end up scratching your head about cases where
> spark magic doesn't work perfectly?
>
>
> 3. Recalculation of cached rdds
> I see the following scenario happening. I load two RDDs A,B from disk,
> cache them and then do some jobs on them, at the very least a count on
> each. After these jobs are done I see on the storage panel that 100% of
> these RDDs are cached in memory.
>
> Then I create a third RDD C which is created by multiple joins and maps
> from A and B, also cache it and start a job on C. When I do this I still
> see A and B completely cached and also see C slowly getting more and more
> cached. This is all fine and good, but in the meanwhile I see stages
> running on the UI that point to code which is used to load A and B. How is
> this possible? Am I misunderstanding how cached RDDs should behave?
>
> And again the general question - how can one debug such issues?
>
> 4. Shuffle on disk
> Is it true - I couldn't find it in official docs, but did see this
> mentioned in various threads - that shuffle _always_ hits disk?
> (Disregarding OS caches.) Why is this the case? Are you planning to add a
> function to do shuffle in memory or are there some intrinsic reasons for
> this to be impossible?
>
>
> Sorry again for the giant mail, and thanks for any insights!
>
> Andras
>
>
>


-- 
Dean Wampler, Ph.D.
Typesafe
@deanwampler
http://typesafe.com
http://polyglotprogramming.com