You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ian O'Connell <ia...@gmail.com> on 2014/07/11 00:15:44 UTC

sparkSQL thread safe?

Had a few quick questions...

Just wondering if right now spark sql is expected to be thread safe on
master?

doing a simple hadoop file -> RDD -> schema RDD -> write parquet

will fail in reflection code if i run these in a thread pool.

The SparkSqlSerializer, seems to create a new Kryo instance each time it
wants to serialize anything. I got a huge speedup when I had any
non-primitive type in my SchemaRDD using the ResourcePool's from Chill for
providing the KryoSerializer to it. (I can open an RB if there is some
reason not to re-use them?)

====

With the Distinct Count operator there is no map-side operations, and a
test to check for this. Is there any reason not to do a map side combine
into a set and then merge the sets later? (similar to the approximate
distinct count operator)

===

Another thing while i'm mailing.. the 1.0.1 docs have a section like:
"
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To
work around this limit, // you can use custom classes that implement the
Product interface.
"

Which sounds great, we have lots of data in thrift.. so via scrooge (
https://github.com/twitter/scrooge), we end up with ultimately instances of
traits which implement product. Though the reflection code appears to look
for the constructor of the class and base the types based on those
parameters?


Ian.

Re: sparkSQL thread safe?

Posted by Reynold Xin <rx...@databricks.com>.
Ian,

The LZFOutputStream's large byte buffer is sort of annoying. It is much
smaller if you use the Snappy one. The downside of the Snappy one is
slightly less compression (I've seen 10 - 20% larger sizes).

If we can find a compression scheme implementation that doesn't do very
large buffers, that'd be a good idea too ... let me know if you have any
suggestions.

In the future, we plan to make shuffle write to less number of streams at
the same time.



On Sat, Jul 12, 2014 at 7:59 PM, Ian O'Connell <ia...@ianoconnell.com> wrote:

> Thanks for the response Michael
>
> On the first i'm following the JIRA now thanks, not blocker for me but
> would be great to see.
>
> I opened up a PR with the resource pool usage around it. I didn't include
> it in the PR, but a few classes we should probably add as registered in
> kryo for good perf/size:
>     classOf[org.apache.spark.sql.catalyst.expressions.GenericRow],
>     classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow],
>     classOf[org.apache.spark.sql.catalyst.expressions.Row],
>     classOf[Array[Object]],
>     scala.collection.immutable.Nil.getClass,
>     scala.collection.immutable.::.getClass,
>     classOf[scala.collection.immutable.::[Any]]
>
> Thanks for adding that distinct btw, great to have it scale more.
>
> On the last, opened the JIRA thanks.
>
> Also more of a sparkCore thing that you might already be aware of, but I
> haven't seen mentioned somewhere and was hitting me(Also if any part of
> this seems wrong to you I'd love to know):
>
> I was getting out of memory doing a bunch of ops against medium(~1TB
> compressed) input sizes with simple things that should spill nicely
> (distinct, reduceByKey(_ + _) ).
>
> Anyway what I came back with(copied from an internal email):
>
> I looked through some heap dumps from the OOM's in spark and found there
> were >10k instances of DiskBlockObjectWriter's each of which were up to
> 300kb in size per active executor. At up to 12 concurrent tasks per host is
> about 33gb of space topping out. The nodes of course were failing before
> this(max mem on our ts cluster per jvm is 25gb).
>
> The memory usage primarily comes from two places, a byte array in
> LZFOutputStream and a byte array in BufferedOutputStream. These are both
> output buffers along the way to disk(so when we are using the former we can
> turn down/disable the latter). These are configured to be 65kb and 100kb
> respectively by default. The former is not a configurable option but is
> static in that library's code.
>
> These come from the ShuffleBlockWriter, that is we get an input stream with
> >10k chunks. When we do operations which require partitioning (say
> distinct, reduceByKey, etc..) it maintains the existing partition count. So
> each task basically opens >10k files, each file handle of which has these
> buffers in place for that task to write to.
>
> Solution put in place(maybe there's a better one?):
>
> Given:
> X: The heap size for an executors JVM
> Y: The number of threads/cores allowed for concurrent execution per host
> Z: The expected overhead of these output streams (currently estimated at
> 65k + size of the output buffer * 1.1 for overheads)
> K: The fraction of memory to allow be used for this overhead (configurable
> parameter, default @ 0.2)
>
> Then, the number of partitions: P = (X / Y / Z) * K
>
> Then inside some of our root sources now:
> -> After assembling the RDD, if numPartitions > P
> -> coalesce to P.
> This won't trigger another shuffle phase, so can easily sit inline to
> source definitions.
>
> The only real down side of this approach i've seen is that it limits the
> number of tasks in this initial map phase which may not be ideal for
> parallelism when loading a large dataset and then filtering heavily. It
> would be more efficient to pass P into the first distinct/reduceByKey call,
> but the user code would have to reference P.
>
>
>
>
>
> On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
> > Hey Ian,
> >
> > Thanks for bringing these up!  Responses in-line:
> >
> > Just wondering if right now spark sql is expected to be thread safe on
> > > master?
> > > doing a simple hadoop file -> RDD -> schema RDD -> write parquet
> > > will fail in reflection code if i run these in a thread pool.
> > >
> >
> > You are probably hitting SPARK-2178
> > <https://issues.apache.org/jira/browse/SPARK-2178> which is caused by
> > SI-6240 <https://issues.scala-lang.org/browse/SI-6240>.  We have a plan
> to
> > fix this by moving the schema introspection to compile time, using
> macros.
> >
> >
> > > The SparkSqlSerializer, seems to create a new Kryo instance each time
> it
> > > wants to serialize anything. I got a huge speedup when I had any
> > > non-primitive type in my SchemaRDD using the ResourcePool's from Chill
> > for
> > > providing the KryoSerializer to it. (I can open an RB if there is some
> > > reason not to re-use them?)
> > >
> >
> > Sounds like SPARK-2102 <https://issues.apache.org/jira/browse/SPARK-2102
> >.
> >  There is no reason AFAIK to not reuse the instance. A PR would be
> greatly
> > appreciated!
> >
> >
> > > With the Distinct Count operator there is no map-side operations, and a
> > > test to check for this. Is there any reason not to do a map side
> combine
> > > into a set and then merge the sets later? (similar to the approximate
> > > distinct count operator)
> > >
> >
> > Thats just not an optimization that we had implemented yet... but I've
> just
> > done it here <https://github.com/apache/spark/pull/1366> and it'll be in
> > master soon :)
> >
> >
> > > Another thing while i'm mailing.. the 1.0.1 docs have a section like:
> > > "
> > > // Note: Case classes in Scala 2.10 can support only up to 22 fields.
> To
> > > work around this limit, // you can use custom classes that implement
> the
> > > Product interface.
> > > "
> > >
> > > Which sounds great, we have lots of data in thrift.. so via scrooge (
> > > https://github.com/twitter/scrooge), we end up with ultimately
> instances
> > > of
> > > traits which implement product. Though the reflection code appears to
> > look
> > > for the constructor of the class and base the types based on those
> > > parameters?
> >
> >
> > Yeah, thats true that we only look in the constructor at the moment, but
> I
> > don't think there is a really good reason for that (other than I guess we
> > will need to add code to make sure we skip builtin object methods).  If
> you
> > want to open a JIRA, we can try fixing this.
> >
> > Michael
> >
>

Re: sparkSQL thread safe?

Posted by Ian O'Connell <ia...@ianoconnell.com>.
Thanks for the response Michael

On the first i'm following the JIRA now thanks, not blocker for me but
would be great to see.

I opened up a PR with the resource pool usage around it. I didn't include
it in the PR, but a few classes we should probably add as registered in
kryo for good perf/size:
    classOf[org.apache.spark.sql.catalyst.expressions.GenericRow],
    classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow],
    classOf[org.apache.spark.sql.catalyst.expressions.Row],
    classOf[Array[Object]],
    scala.collection.immutable.Nil.getClass,
    scala.collection.immutable.::.getClass,
    classOf[scala.collection.immutable.::[Any]]

Thanks for adding that distinct btw, great to have it scale more.

On the last, opened the JIRA thanks.

Also more of a sparkCore thing that you might already be aware of, but I
haven't seen mentioned somewhere and was hitting me(Also if any part of
this seems wrong to you I'd love to know):

I was getting out of memory doing a bunch of ops against medium(~1TB
compressed) input sizes with simple things that should spill nicely
(distinct, reduceByKey(_ + _) ).

Anyway what I came back with(copied from an internal email):

I looked through some heap dumps from the OOM's in spark and found there
were >10k instances of DiskBlockObjectWriter's each of which were up to
300kb in size per active executor. At up to 12 concurrent tasks per host is
about 33gb of space topping out. The nodes of course were failing before
this(max mem on our ts cluster per jvm is 25gb).

The memory usage primarily comes from two places, a byte array in
LZFOutputStream and a byte array in BufferedOutputStream. These are both
output buffers along the way to disk(so when we are using the former we can
turn down/disable the latter). These are configured to be 65kb and 100kb
respectively by default. The former is not a configurable option but is
static in that library's code.

These come from the ShuffleBlockWriter, that is we get an input stream with
>10k chunks. When we do operations which require partitioning (say
distinct, reduceByKey, etc..) it maintains the existing partition count. So
each task basically opens >10k files, each file handle of which has these
buffers in place for that task to write to.

Solution put in place(maybe there's a better one?):

Given:
X: The heap size for an executors JVM
Y: The number of threads/cores allowed for concurrent execution per host
Z: The expected overhead of these output streams (currently estimated at
65k + size of the output buffer * 1.1 for overheads)
K: The fraction of memory to allow be used for this overhead (configurable
parameter, default @ 0.2)

Then, the number of partitions: P = (X / Y / Z) * K

Then inside some of our root sources now:
-> After assembling the RDD, if numPartitions > P
-> coalesce to P.
This won't trigger another shuffle phase, so can easily sit inline to
source definitions.

The only real down side of this approach i've seen is that it limits the
number of tasks in this initial map phase which may not be ideal for
parallelism when loading a large dataset and then filtering heavily. It
would be more efficient to pass P into the first distinct/reduceByKey call,
but the user code would have to reference P.





On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Hey Ian,
>
> Thanks for bringing these up!  Responses in-line:
>
> Just wondering if right now spark sql is expected to be thread safe on
> > master?
> > doing a simple hadoop file -> RDD -> schema RDD -> write parquet
> > will fail in reflection code if i run these in a thread pool.
> >
>
> You are probably hitting SPARK-2178
> <https://issues.apache.org/jira/browse/SPARK-2178> which is caused by
> SI-6240 <https://issues.scala-lang.org/browse/SI-6240>.  We have a plan to
> fix this by moving the schema introspection to compile time, using macros.
>
>
> > The SparkSqlSerializer, seems to create a new Kryo instance each time it
> > wants to serialize anything. I got a huge speedup when I had any
> > non-primitive type in my SchemaRDD using the ResourcePool's from Chill
> for
> > providing the KryoSerializer to it. (I can open an RB if there is some
> > reason not to re-use them?)
> >
>
> Sounds like SPARK-2102 <https://issues.apache.org/jira/browse/SPARK-2102>.
>  There is no reason AFAIK to not reuse the instance. A PR would be greatly
> appreciated!
>
>
> > With the Distinct Count operator there is no map-side operations, and a
> > test to check for this. Is there any reason not to do a map side combine
> > into a set and then merge the sets later? (similar to the approximate
> > distinct count operator)
> >
>
> Thats just not an optimization that we had implemented yet... but I've just
> done it here <https://github.com/apache/spark/pull/1366> and it'll be in
> master soon :)
>
>
> > Another thing while i'm mailing.. the 1.0.1 docs have a section like:
> > "
> > // Note: Case classes in Scala 2.10 can support only up to 22 fields. To
> > work around this limit, // you can use custom classes that implement the
> > Product interface.
> > "
> >
> > Which sounds great, we have lots of data in thrift.. so via scrooge (
> > https://github.com/twitter/scrooge), we end up with ultimately instances
> > of
> > traits which implement product. Though the reflection code appears to
> look
> > for the constructor of the class and base the types based on those
> > parameters?
>
>
> Yeah, thats true that we only look in the constructor at the moment, but I
> don't think there is a really good reason for that (other than I guess we
> will need to add code to make sure we skip builtin object methods).  If you
> want to open a JIRA, we can try fixing this.
>
> Michael
>

Re: sparkSQL thread safe?

Posted by Michael Armbrust <mi...@databricks.com>.
Hey Ian,

Thanks for bringing these up!  Responses in-line:

Just wondering if right now spark sql is expected to be thread safe on
> master?
> doing a simple hadoop file -> RDD -> schema RDD -> write parquet
> will fail in reflection code if i run these in a thread pool.
>

You are probably hitting SPARK-2178
<https://issues.apache.org/jira/browse/SPARK-2178> which is caused by
SI-6240 <https://issues.scala-lang.org/browse/SI-6240>.  We have a plan to
fix this by moving the schema introspection to compile time, using macros.


> The SparkSqlSerializer, seems to create a new Kryo instance each time it
> wants to serialize anything. I got a huge speedup when I had any
> non-primitive type in my SchemaRDD using the ResourcePool's from Chill for
> providing the KryoSerializer to it. (I can open an RB if there is some
> reason not to re-use them?)
>

Sounds like SPARK-2102 <https://issues.apache.org/jira/browse/SPARK-2102>.
 There is no reason AFAIK to not reuse the instance. A PR would be greatly
appreciated!


> With the Distinct Count operator there is no map-side operations, and a
> test to check for this. Is there any reason not to do a map side combine
> into a set and then merge the sets later? (similar to the approximate
> distinct count operator)
>

Thats just not an optimization that we had implemented yet... but I've just
done it here <https://github.com/apache/spark/pull/1366> and it'll be in
master soon :)


> Another thing while i'm mailing.. the 1.0.1 docs have a section like:
> "
> // Note: Case classes in Scala 2.10 can support only up to 22 fields. To
> work around this limit, // you can use custom classes that implement the
> Product interface.
> "
>
> Which sounds great, we have lots of data in thrift.. so via scrooge (
> https://github.com/twitter/scrooge), we end up with ultimately instances
> of
> traits which implement product. Though the reflection code appears to look
> for the constructor of the class and base the types based on those
> parameters?


Yeah, thats true that we only look in the constructor at the moment, but I
don't think there is a really good reason for that (other than I guess we
will need to add code to make sure we skip builtin object methods).  If you
want to open a JIRA, we can try fixing this.

Michael