You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Ulanov, Alexander" <al...@hp.com> on 2015/08/20 23:57:49 UTC

Dataframe aggregation with Tungsten unsafe

Dear Spark developers,

I am trying to benchmark the new Dataframe aggregation implemented under the project Tungsten and released with Spark 1.4 (I am using the latest Spark from the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to allocate memory and in-place update. It was also presented on Spark Summit this Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true

I wrote a simple code that does aggregation of values by keys. However, the time needed to execute the code does not depend if the new aggregation is on or off. Could you suggest how can I observe the improvement that the aggregation provides? Could you write a code snippet that takes advantage of the new aggregation?

case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


Best regards, Alexander

Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
BTW one other thing -- don't use the count() to do benchmark, since the
optimizer is smart enough to figure out that you don't actually need to run
the sum.


For the purpose of benchmarking, you can use

df.foreach(i => do nothing)




On Thu, Aug 20, 2015 at 3:31 PM, Reynold Xin <rx...@databricks.com> wrote:

>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rx...@databricks.com> wrote:
>
>> How did you run this? I couldn't run your query with 4G of RAM in 1.4,
>> but in 1.5 it ran.
>>
>> Also I recommend just dumping the data to parquet on disk to evaluate,
>> rather than using the in-memory cache, which is super slow and we are
>> thinking of removing/replacing with something else.
>>
>>
>> val size = 100000000
>> val partitions = 10
>> val repetitions = 5
>> val data = sc.parallelize(1 to size, partitions).map(x =>
>> (util.Random.nextInt(size / repetitions),
>> util.Random.nextDouble)).toDF("key", "value")
>>
>> data.write.parquet("/scratch/rxin/tmp/alex")
>>
>>
>> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
>> val t = System.nanoTime()
>> val res = df.groupBy("key").agg(sum("value"))
>> res.count()
>> println((System.nanoTime() - t) / 1e9)
>>
>>
>>
>> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
>> alexander.ulanov@hp.com> wrote:
>>
>>> Dear Spark developers,
>>>
>>>
>>>
>>> I am trying to benchmark the new Dataframe aggregation implemented under
>>> the project Tungsten and released with Spark 1.4 (I am using the latest
>>> Spark from the repo, i.e. 1.5):
>>>
>>> https://github.com/apache/spark/pull/5725
>>>
>>> It tells that the aggregation should be faster due to using the unsafe
>>> to allocate memory and in-place update. It was also presented on Spark
>>> Summit this Summer:
>>>
>>>
>>> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
>>>
>>> The following enables the new aggregation in spark-config:
>>>
>>> spark.sql.unsafe.enabled=true
>>>
>>> spark.unsafe.offHeap=true
>>>
>>>
>>>
>>> I wrote a simple code that does aggregation of values by keys. However,
>>> the time needed to execute the code does not depend if the new aggregation
>>> is on or off. Could you suggest how can I observe the improvement that the
>>> aggregation provides? Could you write a code snippet that takes advantage
>>> of the new aggregation?
>>>
>>>
>>>
>>> case class Counter(key: Int, value: Double)
>>>
>>> val size = 100000000
>>>
>>> val partitions = 5
>>>
>>> val repetitions = 5
>>>
>>> val data = sc.parallelize(1 to size, partitions).map(x =>
>>> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
>>>
>>> val df = sqlContext.createDataFrame(data)
>>>
>>> df.persist()
>>>
>>> df.count()
>>>
>>> val t = System.nanoTime()
>>>
>>> val res = df.groupBy("key").agg(sum("value"))
>>>
>>> res.count()
>>>
>>> println((System.nanoTime() - t) / 1e9)
>>>
>>>
>>>
>>>
>>>
>>> Best regards, Alexander
>>>
>>
>>
>

RE: Dataframe aggregation with Tungsten unsafe

Posted by "Wang, Yanping" <ya...@intel.com>.
Hi, Reynold and others

I agree with your comments on mid-tenured objects and GC. In fact, dealing with mid-tenured objects are the major challenge for all java GC implementations.

I am wondering if anyone has played -XX:+PrintTenuringDistribution flags and see how exactly ages distribution look like when your program runs?
My output with -XX:+PrintGCDetails look like below: (Oracle jdk8 update 60 http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Age 1-5 are young guys, 13, 14, 15 are old guys.
The middle guys will have to be copied multiple times before become dead in old regions normally need some major GC to clean them up.

Desired survivor size 2583691264 bytes, new threshold 15 (max 15)
- age   1:   13474960 bytes,   13474960 total
- age   2:    2815592 bytes,   16290552 total
- age   3:     632784 bytes,   16923336 total
- age   4:     428432 bytes,   17351768 total
- age   5:     648696 bytes,   18000464 total
- age   6:     572328 bytes,   18572792 total
- age   7:     549216 bytes,   19122008 total
- age   8:     539544 bytes,   19661552 total
- age   9:     422256 bytes,   20083808 total
- age  10:     552928 bytes,   20636736 total
- age  11:     430464 bytes,   21067200 total
- age  12:     753320 bytes,   21820520 total
- age  13:     230864 bytes,   22051384 total
- age  14:     276288 bytes,   22327672 total
- age  15:     809272 bytes,   23136944 total

I’d love to see how others’ objects’ age distribution look like. Actually once we know the age distribution for some particular use cases, we can find a ways to avoid Full GC. Full GC is expensive because both CMS and G1 Full GC are single threaded. GC tuning nowadays becomes a task of just trying to avoid Full GC completely.

Thanks
-yanping

From: Reynold Xin [mailto:rxin@databricks.com]
Sent: Tuesday, August 25, 2015 6:05 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Dataframe aggregation with Tungsten unsafe

There are a lot of GC activity due to the non-code-gen path being sloppy about garbage creation. This is not actually what happens, but just as an example:

rdd.map { i: Int => i + 1 }

This under the hood becomes a closure that boxes on every input and every output, creating two extra objects.

The reality is more complicated than this -- but here's a simpler view of what happens with GC in these cases. You might've heard from other places that the JVM is very efficient about transient object allocations. That is true when you look at these allocations in isolation, but unfortunately not true when you look at them in aggregate.

First, due to the way the iterator interface is constructed, it is hard for the JIT compiler to on-stack allocate these objects. Then two things happen:

1. They pile up and cause more young gen GCs to happen.
2. After a few young gen GCs, some mid-tenured objects (e.g. an aggregation map) get copied into the old-gen, and eventually requires a full GC to free them. Full GCs are much more expensive than young gen GCs (usually involves copying all the data in the old gen).

So the more garbages that are created -> the more frequently full GC happens.

The more long lived objects in the old gen (e.g. cache) -> the more expensive full GC is.



On Tue, Aug 25, 2015 at 5:19 PM, Ulanov, Alexander <al...@hp.com>> wrote:
Thank you for the explanation. The size if the 100M data is ~1.4GB in memory and each worker has 32GB of memory. It seems to be a lot of free memory available. I wonder how Spark can hit GC with such setup?

Reynold Xin <rx...@databricks.com>>>

On Fri, Aug 21, 2015 at 11:07 AM, Ulanov, Alexander <al...@hp.com>>> wrote:

It seems that there is a nice improvement with Tungsten enabled given that data is persisted in memory 2x and 3x. However, the improvement is not that nice for parquet, it is 1.5x. What’s interesting, with Tungsten enabled performance of in-memory data and parquet data aggregation is similar. Could anyone comment on this? It seems counterintuitive to me.

Local performance was not as good as Reynold had. I have around 1.5x, he had 5x. However, local mode is not interesting.


I think a large part of that is coming from the pressure created by JVM GC. Putting more data in-memory makes GC worse, unless GC is well tuned.




Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
There are a lot of GC activity due to the non-code-gen path being sloppy
about garbage creation. This is not actually what happens, but just as an
example:

rdd.map { i: Int => i + 1 }

This under the hood becomes a closure that boxes on every input and every
output, creating two extra objects.

The reality is more complicated than this -- but here's a simpler view of
what happens with GC in these cases. You might've heard from other places
that the JVM is very efficient about transient object allocations. That is
true when you look at these allocations in isolation, but unfortunately not
true when you look at them in aggregate.

First, due to the way the iterator interface is constructed, it is hard for
the JIT compiler to on-stack allocate these objects. Then two things happen:

1. They pile up and cause more young gen GCs to happen.
2. After a few young gen GCs, some mid-tenured objects (e.g. an aggregation
map) get copied into the old-gen, and eventually requires a full GC to free
them. Full GCs are much more expensive than young gen GCs (usually involves
copying all the data in the old gen).

So the more garbages that are created -> the more frequently full GC
happens.

The more long lived objects in the old gen (e.g. cache) -> the more
expensive full GC is.



On Tue, Aug 25, 2015 at 5:19 PM, Ulanov, Alexander <al...@hp.com>
wrote:

> Thank you for the explanation. The size if the 100M data is ~1.4GB in
> memory and each worker has 32GB of memory. It seems to be a lot of free
> memory available. I wonder how Spark can hit GC with such setup?
>
> Reynold Xin <rx...@databricks.com>>
>
>
> On Fri, Aug 21, 2015 at 11:07 AM, Ulanov, Alexander <
> alexander.ulanov@hp.com<ma...@hp.com>> wrote:
>
> It seems that there is a nice improvement with Tungsten enabled given that
> data is persisted in memory 2x and 3x. However, the improvement is not that
> nice for parquet, it is 1.5x. What’s interesting, with Tungsten enabled
> performance of in-memory data and parquet data aggregation is similar.
> Could anyone comment on this? It seems counterintuitive to me.
>
> Local performance was not as good as Reynold had. I have around 1.5x, he
> had 5x. However, local mode is not interesting.
>
>
> I think a large part of that is coming from the pressure created by JVM
> GC. Putting more data in-memory makes GC worse, unless GC is well tuned.
>
>
>
>

Re: Dataframe aggregation with Tungsten unsafe

Posted by "Ulanov, Alexander" <al...@hp.com>.
Thank you for the explanation. The size if the 100M data is ~1.4GB in memory and each worker has 32GB of memory. It seems to be a lot of free memory available. I wonder how Spark can hit GC with such setup?

Reynold Xin <rx...@databricks.com>>


On Fri, Aug 21, 2015 at 11:07 AM, Ulanov, Alexander <al...@hp.com>> wrote:

It seems that there is a nice improvement with Tungsten enabled given that data is persisted in memory 2x and 3x. However, the improvement is not that nice for parquet, it is 1.5x. What’s interesting, with Tungsten enabled performance of in-memory data and parquet data aggregation is similar. Could anyone comment on this? It seems counterintuitive to me.

Local performance was not as good as Reynold had. I have around 1.5x, he had 5x. However, local mode is not interesting.


I think a large part of that is coming from the pressure created by JVM GC. Putting more data in-memory makes GC worse, unless GC is well tuned.




---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
On Fri, Aug 21, 2015 at 11:07 AM, Ulanov, Alexander <alexander.ulanov@hp.com
> wrote:

>
>
> It seems that there is a nice improvement with Tungsten enabled given that
> data is persisted in memory 2x and 3x. However, the improvement is not that
> nice for parquet, it is 1.5x. What’s interesting, with Tungsten enabled
> performance of in-memory data and parquet data aggregation is similar.
> Could anyone comment on this? It seems counterintuitive to me.
>
>
>
> Local performance was not as good as Reynold had. I have around 1.5x, he
> had 5x. However, local mode is not interesting.
>
>
>
>
I think a large part of that is coming from the pressure created by JVM GC.
Putting more data in-memory makes GC worse, unless GC is well tuned.

RE: Dataframe aggregation with Tungsten unsafe

Posted by "Ulanov, Alexander" <al...@hp.com>.
I’ve made few experiments in different settings based on the same code that you used.
1)Created two datasets in hdfs on a cluster of 5 worker nodes and copied them to local fs:
val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value")
data.write.parquet("hdfs://alex")
data.write.parquet(“/home/alex”)
val sample = data.sample(true, 0.1)
sample.write.parquet("hdfs://alex-10m")
sample.write.parquet(“/home/alex-10m”)
2) Run the following code in local mode (spark-shell --master local) and cluster mode (5 nodes with 1 worker each)
val df = sqlContext.read.parquet("data")
val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)
3) Run the same code in local and cluster mode with persisting the data in memory
val df = sqlContext.read.parquet("data")
df.persist
df.foreach { x => {} }
val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)

In the above both cases Tungsten was switched on or off by:
sqlContext.setConf("spark.sql.tungsten.enabled", "true" or ”false”).
Each experiment was run in a new shell. Below are the results:

Data size

Mode

Storage

Tungsten disabled

Tungsten enabled

10M

Cluster


Parquet

9.6

7.4

Persist

10.9

5.1

Local

Parquet

57.7

35.8

Persist

61.9

31.4

100M

Cluster

Parquet

25.4

18.8

Persist

48.6

14.8


Hardware: 6x nodes with 2x Xeon  X5650  @ 2.67 32GB RAM, 1 master, 5 workers. Local mode: one node.

It seems that there is a nice improvement with Tungsten enabled given that data is persisted in memory 2x and 3x. However, the improvement is not that nice for parquet, it is 1.5x. What’s interesting, with Tungsten enabled performance of in-memory data and parquet data aggregation is similar. Could anyone comment on this? It seems counterintuitive to me.

Local performance was not as good as Reynold had. I have around 1.5x, he had 5x. However, local mode is not interesting.


From: Reynold Xin [mailto:rxin@databricks.com]
Sent: Thursday, August 20, 2015 9:24 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Dataframe aggregation with Tungsten unsafe

Not sure what's going on or how you measure the time, but the difference here is pretty big when I test on my laptop. Maybe you set the wrong config variables? (spark.sql.* are sql variables that you set in sqlContext.setConf -- and in 1.5, they are consolidated into a single flag: spark.sql.tungsten.enabled. See below.


I ran with a 10m dataset (created by calling sample(true, 0.1) on the 100m dataset), since the 100m one takes too long when tungsten is off on my laptop so I didn't wait. (40s - 50s with Tungsten on)


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex-10m")

val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)


On 1.5, with 8g driver memory and 8 cores:

5.48951

sqlContext.setConf("spark.sql.tungsten.enabled", "false")

run it again, and took 25.127962.


On 1.4, with 8g driver memory and 8 cores: 25.583473


It's also possible that the benefit is less when you have infinite amount of memory (relative to the tiny dataset size) and as a result GC happens less.


On Thu, Aug 20, 2015 at 7:00 PM, Ulanov, Alexander <al...@hp.com>> wrote:
Did git pull :)

Now I do get the difference in time between on/off Tungsten unsafe: it is 24-25 seconds (unsafe on) vs 32-26 seconds (unsafe off) for the example below.

Why I am not getting the improvement as advertised on Spark Summit (slide 23)?
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen

My dataset is 100M rows, is it big enough to get the improvement? Do I use aggregate correctly?


case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.foreach { x => {} }
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.foreach { x => {} }
println((System.nanoTime() - t) / 1e9)

Unsafe on:
spark.sql.codegen       true
spark.sql.unsafe.enabled        true
spark.unsafe.offHeap    true

Unsafe off:
spark.sql.codegen       false
spark.sql.unsafe.enabled        false
spark.unsafe.offHeap    false

From: Reynold Xin [mailto:rxin@databricks.com<ma...@databricks.com>]
Sent: Thursday, August 20, 2015 5:43 PM

To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

Please git pull :)


On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander <al...@hp.com>> wrote:
I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe feature was added to Spark on April 29.)

From: Reynold Xin [mailto:rxin@databricks.com<ma...@databricks.com>]
Sent: Thursday, August 20, 2015 5:26 PM

To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

Yes - DataFrame and SQL are the same thing.

Which version are you running? Spark 1.4 doesn't run Janino --- but you have a Janino exception?

On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <al...@hp.com>> wrote:
When I add the following option:
spark.sql.codegen      true

Spark crashed on the “df.count” with concurrentException (below). Are you sure that I need to set this flag to get unsafe? It looks like SQL flag, and I don’t use sql.


java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
         at org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
         at org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
         at org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
         at org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
         at org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
         at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
         at org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
         at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
         at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
         at org.apache.spark.scheduler.Task.run(Task.scala:70)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
         at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
         at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
         at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
         at org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
         at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
         at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
         at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
         at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
         at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
         at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
         at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
         at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
         at org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
         at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
         ... 28 more
Caused by: java.lang.ClassNotFoundException: Override
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at java.lang.Class.forName0(Native Method)
         at java.lang.Class.forName(Class.java:270)
         at org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
         at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
         ... 66 more
Caused by: java.lang.ClassNotFoundException: Override
         at java.lang.ClassLoader.findClass(ClassLoader.java:531)
         at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
         ... 73 more


From: Reynold Xin [mailto:rxin@databricks.com<ma...@databricks.com>]
Sent: Thursday, August 20, 2015 4:22 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

I think you might need to turn codegen on also in order for the unsafe stuff to work.


On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <al...@hp.com>> wrote:
Hi Reynold,

Thank you for suggestion. This code takes around 30 sec on my setup (5 workers with 32GB). My issue is that I don't see the change in time if I unset the unsafe flags. Could you explain why it might happen?

20 авг. 2015 г., в 15:32, Reynold Xin <rx...@databricks.com>>> написал(а):

 I didn't wait long enough earlier. Actually it did finish when I raised memory to 8g.

In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags), the query took 40s with 4G of mem.

In 1.4, it took 195s with 8G of mem.

This is not a scientific benchmark and I only ran it once.



On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rx...@databricks.com>>> wrote:
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate, rather than using the in-memory cache, which is super slow and we are thinking of removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)
On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <al...@hp.com>>> wrote:
Dear Spark developers,

I am trying to benchmark the new Dataframe aggregation implemented under the project Tungsten and released with Spark 1.4 (I am using the latest Spark from the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to allocate memory and in-place update. It was also presented on Spark Summit this Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true

I wrote a simple code that does aggregation of values by keys. However, the time needed to execute the code does not depend if the new aggregation is on or off. Could you suggest how can I observe the improvement that the aggregation provides? Could you write a code snippet that takes advantage of the new aggregation?

case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


Best regards, Alexander





Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
Not sure what's going on or how you measure the time, but the difference
here is pretty big when I test on my laptop. Maybe you set the wrong config
variables? (spark.sql.* are sql variables that you set in
sqlContext.setConf -- and in 1.5, they are consolidated into a single
flag: spark.sql.tungsten.enabled. See below.


I ran with a 10m dataset (created by calling sample(true, 0.1) on the 100m
dataset), since the 100m one takes too long when tungsten is off on my
laptop so I didn't wait. (40s - 50s with Tungsten on)


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex-10m")

val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)


On 1.5, with 8g driver memory and 8 cores:

5.48951

sqlContext.setConf("spark.sql.tungsten.enabled", "false")

run it again, and took 25.127962.


On 1.4, with 8g driver memory and 8 cores: 25.583473


It's also possible that the benefit is less when you have infinite amount
of memory (relative to the tiny dataset size) and as a result GC happens
less.


On Thu, Aug 20, 2015 at 7:00 PM, Ulanov, Alexander <al...@hp.com>
wrote:

> Did git pull :)
>
>
>
> Now I do get the difference in time between on/off Tungsten unsafe: it is
> 24-25 seconds (unsafe on) vs 32-26 seconds (unsafe off) for the example
> below.
>
>
>
> Why I am not getting the improvement as advertised on Spark Summit (slide
> 23)?
>
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
>
>
>
> My dataset is 100M rows, is it big enough to get the improvement? Do I use
> aggregate correctly?
>
>
>
>
>
> case class Counter(key: Int, value: Double)
>
> val size = 100000000
>
> val partitions = 5
>
> val repetitions = 5
>
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
>
> val df = sqlContext.createDataFrame(data)
>
> df.persist()
>
> df.foreach { x => {} }
>
> val t = System.nanoTime()
>
> val res = df.groupBy("key").agg(sum("value"))
>
> res.foreach { x => {} }
>
> println((System.nanoTime() - t) / 1e9)
>
>
>
> Unsafe on:
>
> spark.sql.codegen       true
>
> spark.sql.unsafe.enabled        true
>
> spark.unsafe.offHeap    true
>
>
>
> Unsafe off:
>
> spark.sql.codegen       false
>
> spark.sql.unsafe.enabled        false
>
> spark.unsafe.offHeap    false
>
>
>
> *From:* Reynold Xin [mailto:rxin@databricks.com]
> *Sent:* Thursday, August 20, 2015 5:43 PM
>
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> Please git pull :)
>
>
>
>
>
> On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
> I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe
> feature was added to Spark on April 29.)
>
>
>
> *From:* Reynold Xin [mailto:rxin@databricks.com]
> *Sent:* Thursday, August 20, 2015 5:26 PM
>
>
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> Yes - DataFrame and SQL are the same thing.
>
>
>
> Which version are you running? Spark 1.4 doesn't run Janino --- but you
> have a Janino exception?
>
>
>
> On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
> When I add the following option:
>
> spark.sql.codegen      true
>
>
>
> Spark crashed on the “df.count” with concurrentException (below). Are you
> sure that I need to set this flag to get unsafe? It looks like SQL flag,
> and I don’t use sql.
>
>
>
>
>
> java.util.concurrent.ExecutionException:
> org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>
>          at
> org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>
>          at
> org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
>
>          at
> org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>
>          at
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
>
>          at
> org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>          at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column
> 10: Override
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
>
>          at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
>
>          at
> org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
>
>          at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
>
>          at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
>
>          at
> org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
>
>          at
> org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
>
>          at
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>
>          at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
>
>          at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
>
>          at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
>
>          at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
>
>          at
> org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>
>          ... 28 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at java.lang.Class.forName0(Native Method)
>
>          at java.lang.Class.forName(Class.java:270)
>
>          at
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
>
>          at
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
>
>          ... 66 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at java.lang.ClassLoader.findClass(ClassLoader.java:531)
>
>          at
> org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
>
>          ... 73 more
>
>
>
>
>
> *From:* Reynold Xin [mailto:rxin@databricks.com]
> *Sent:* Thursday, August 20, 2015 4:22 PM
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> I think you might need to turn codegen on also in order for the unsafe
> stuff to work.
>
>
>
>
>
> On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
> Hi Reynold,
>
> Thank you for suggestion. This code takes around 30 sec on my setup (5
> workers with 32GB). My issue is that I don't see the change in time if I
> unset the unsafe flags. Could you explain why it might happen?
>
> 20 авг. 2015 г., в 15:32, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> написал(а):
>
>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> wrote:
> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com<ma...@hp.com>> wrote:
> Dear Spark developers,
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
> https://github.com/apache/spark/pull/5725
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
> The following enables the new aggregation in spark-config:
> spark.sql.unsafe.enabled=true
> spark.unsafe.offHeap=true
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
> case class Counter(key: Int, value: Double)
> val size = 100000000
> val partitions = 5
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
> val df = sqlContext.createDataFrame(data)
> df.persist()
> df.count()
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> Best regards, Alexander
>
>
>
>
>
>
>

RE: Dataframe aggregation with Tungsten unsafe

Posted by "Ulanov, Alexander" <al...@hp.com>.
Did git pull :)

Now I do get the difference in time between on/off Tungsten unsafe: it is 24-25 seconds (unsafe on) vs 32-26 seconds (unsafe off) for the example below.

Why I am not getting the improvement as advertised on Spark Summit (slide 23)?
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen

My dataset is 100M rows, is it big enough to get the improvement? Do I use aggregate correctly?


case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.foreach { x => {} }
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.foreach { x => {} }
println((System.nanoTime() - t) / 1e9)

Unsafe on:
spark.sql.codegen       true
spark.sql.unsafe.enabled        true
spark.unsafe.offHeap    true

Unsafe off:
spark.sql.codegen       false
spark.sql.unsafe.enabled        false
spark.unsafe.offHeap    false

From: Reynold Xin [mailto:rxin@databricks.com]
Sent: Thursday, August 20, 2015 5:43 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Dataframe aggregation with Tungsten unsafe

Please git pull :)


On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander <al...@hp.com>> wrote:
I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe feature was added to Spark on April 29.)

From: Reynold Xin [mailto:rxin@databricks.com<ma...@databricks.com>]
Sent: Thursday, August 20, 2015 5:26 PM

To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

Yes - DataFrame and SQL are the same thing.

Which version are you running? Spark 1.4 doesn't run Janino --- but you have a Janino exception?

On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <al...@hp.com>> wrote:
When I add the following option:
spark.sql.codegen      true

Spark crashed on the “df.count” with concurrentException (below). Are you sure that I need to set this flag to get unsafe? It looks like SQL flag, and I don’t use sql.


java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
         at org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
         at org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
         at org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
         at org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
         at org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
         at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
         at org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
         at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
         at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
         at org.apache.spark.scheduler.Task.run(Task.scala:70)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
         at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
         at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
         at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
         at org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
         at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
         at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
         at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
         at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
         at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
         at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
         at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
         at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
         at org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
         at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
         ... 28 more
Caused by: java.lang.ClassNotFoundException: Override
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at java.lang.Class.forName0(Native Method)
         at java.lang.Class.forName(Class.java:270)
         at org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
         at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
         ... 66 more
Caused by: java.lang.ClassNotFoundException: Override
         at java.lang.ClassLoader.findClass(ClassLoader.java:531)
         at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
         ... 73 more


From: Reynold Xin [mailto:rxin@databricks.com<ma...@databricks.com>]
Sent: Thursday, August 20, 2015 4:22 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

I think you might need to turn codegen on also in order for the unsafe stuff to work.


On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <al...@hp.com>> wrote:
Hi Reynold,

Thank you for suggestion. This code takes around 30 sec on my setup (5 workers with 32GB). My issue is that I don't see the change in time if I unset the unsafe flags. Could you explain why it might happen?

20 авг. 2015 г., в 15:32, Reynold Xin <rx...@databricks.com>>> написал(а):

 I didn't wait long enough earlier. Actually it did finish when I raised memory to 8g.

In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags), the query took 40s with 4G of mem.

In 1.4, it took 195s with 8G of mem.

This is not a scientific benchmark and I only ran it once.



On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rx...@databricks.com>>> wrote:
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate, rather than using the in-memory cache, which is super slow and we are thinking of removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)
On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <al...@hp.com>>> wrote:
Dear Spark developers,

I am trying to benchmark the new Dataframe aggregation implemented under the project Tungsten and released with Spark 1.4 (I am using the latest Spark from the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to allocate memory and in-place update. It was also presented on Spark Summit this Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true

I wrote a simple code that does aggregation of values by keys. However, the time needed to execute the code does not depend if the new aggregation is on or off. Could you suggest how can I observe the improvement that the aggregation provides? Could you write a code snippet that takes advantage of the new aggregation?

case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


Best regards, Alexander




Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
Please git pull :)


On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander <al...@hp.com>
wrote:

> I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe
> feature was added to Spark on April 29.)
>
>
>
> *From:* Reynold Xin [mailto:rxin@databricks.com]
> *Sent:* Thursday, August 20, 2015 5:26 PM
>
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> Yes - DataFrame and SQL are the same thing.
>
>
>
> Which version are you running? Spark 1.4 doesn't run Janino --- but you
> have a Janino exception?
>
>
>
> On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
> When I add the following option:
>
> spark.sql.codegen      true
>
>
>
> Spark crashed on the “df.count” with concurrentException (below). Are you
> sure that I need to set this flag to get unsafe? It looks like SQL flag,
> and I don’t use sql.
>
>
>
>
>
> java.util.concurrent.ExecutionException:
> org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>
>          at
> org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>
>          at
> org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
>
>          at
> org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>
>          at
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
>
>          at
> org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>          at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column
> 10: Override
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
>
>          at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
>
>          at
> org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
>
>          at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
>
>          at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
>
>          at
> org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
>
>          at
> org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
>
>          at
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>
>          at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
>
>          at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
>
>          at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
>
>          at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
>
>          at
> org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>
>          ... 28 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at java.lang.Class.forName0(Native Method)
>
>          at java.lang.Class.forName(Class.java:270)
>
>          at
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
>
>          at
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
>
>          ... 66 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at java.lang.ClassLoader.findClass(ClassLoader.java:531)
>
>          at
> org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
>
>          ... 73 more
>
>
>
>
>
> *From:* Reynold Xin [mailto:rxin@databricks.com]
> *Sent:* Thursday, August 20, 2015 4:22 PM
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> I think you might need to turn codegen on also in order for the unsafe
> stuff to work.
>
>
>
>
>
> On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
> Hi Reynold,
>
> Thank you for suggestion. This code takes around 30 sec on my setup (5
> workers with 32GB). My issue is that I don't see the change in time if I
> unset the unsafe flags. Could you explain why it might happen?
>
> 20 авг. 2015 г., в 15:32, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> написал(а):
>
>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> wrote:
> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com<ma...@hp.com>> wrote:
> Dear Spark developers,
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
> https://github.com/apache/spark/pull/5725
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
> The following enables the new aggregation in spark-config:
> spark.sql.unsafe.enabled=true
> spark.unsafe.offHeap=true
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
> case class Counter(key: Int, value: Double)
> val size = 100000000
> val partitions = 5
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
> val df = sqlContext.createDataFrame(data)
> df.persist()
> df.count()
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> Best regards, Alexander
>
>
>
>
>

RE: Dataframe aggregation with Tungsten unsafe

Posted by "Ulanov, Alexander" <al...@hp.com>.
I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe feature was added to Spark on April 29.)

From: Reynold Xin [mailto:rxin@databricks.com]
Sent: Thursday, August 20, 2015 5:26 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Dataframe aggregation with Tungsten unsafe

Yes - DataFrame and SQL are the same thing.

Which version are you running? Spark 1.4 doesn't run Janino --- but you have a Janino exception?

On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <al...@hp.com>> wrote:
When I add the following option:
spark.sql.codegen      true

Spark crashed on the “df.count” with concurrentException (below). Are you sure that I need to set this flag to get unsafe? It looks like SQL flag, and I don’t use sql.


java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
         at org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
         at org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
         at org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
         at org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
         at org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
         at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
         at org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
         at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
         at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
         at org.apache.spark.scheduler.Task.run(Task.scala:70)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
         at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
         at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
         at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
         at org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
         at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
         at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
         at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
         at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
         at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
         at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
         at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
         at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
         at org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
         at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
         ... 28 more
Caused by: java.lang.ClassNotFoundException: Override
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at java.lang.Class.forName0(Native Method)
         at java.lang.Class.forName(Class.java:270)
         at org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
         at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
         ... 66 more
Caused by: java.lang.ClassNotFoundException: Override
         at java.lang.ClassLoader.findClass(ClassLoader.java:531)
         at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
         ... 73 more


From: Reynold Xin [mailto:rxin@databricks.com<ma...@databricks.com>]
Sent: Thursday, August 20, 2015 4:22 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<ma...@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

I think you might need to turn codegen on also in order for the unsafe stuff to work.


On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <al...@hp.com>> wrote:
Hi Reynold,

Thank you for suggestion. This code takes around 30 sec on my setup (5 workers with 32GB). My issue is that I don't see the change in time if I unset the unsafe flags. Could you explain why it might happen?

20 авг. 2015 г., в 15:32, Reynold Xin <rx...@databricks.com>>> написал(а):

 I didn't wait long enough earlier. Actually it did finish when I raised memory to 8g.

In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags), the query took 40s with 4G of mem.

In 1.4, it took 195s with 8G of mem.

This is not a scientific benchmark and I only ran it once.



On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rx...@databricks.com>>> wrote:
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate, rather than using the in-memory cache, which is super slow and we are thinking of removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)

On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <al...@hp.com>>> wrote:
Dear Spark developers,

I am trying to benchmark the new Dataframe aggregation implemented under the project Tungsten and released with Spark 1.4 (I am using the latest Spark from the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to allocate memory and in-place update. It was also presented on Spark Summit this Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true

I wrote a simple code that does aggregation of values by keys. However, the time needed to execute the code does not depend if the new aggregation is on or off. Could you suggest how can I observe the improvement that the aggregation provides? Could you write a code snippet that takes advantage of the new aggregation?

case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


Best regards, Alexander



Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
Yes - DataFrame and SQL are the same thing.

Which version are you running? Spark 1.4 doesn't run Janino --- but you
have a Janino exception?

On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <al...@hp.com>
wrote:

> When I add the following option:
>
> spark.sql.codegen      true
>
>
>
> Spark crashed on the “df.count” with concurrentException (below). Are you
> sure that I need to set this flag to get unsafe? It looks like SQL flag,
> and I don’t use sql.
>
>
>
>
>
> java.util.concurrent.ExecutionException:
> org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>
>          at
> org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>
>          at
> org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
>
>          at
> org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>
>          at
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
>
>          at
> org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>          at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column
> 10: Override
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
>
>          at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
>
>          at
> org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
>
>          at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
>
>          at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
>
>          at
> org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
>
>          at
> org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
>
>          at
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>
>          at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
>
>          at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
>
>          at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
>
>          at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
>
>          at
> org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>
>          ... 28 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at java.lang.Class.forName0(Native Method)
>
>          at java.lang.Class.forName(Class.java:270)
>
>          at
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
>
>          at
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
>
>          ... 66 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at java.lang.ClassLoader.findClass(ClassLoader.java:531)
>
>          at
> org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
>
>          ... 73 more
>
>
>
>
>
> *From:* Reynold Xin [mailto:rxin@databricks.com]
> *Sent:* Thursday, August 20, 2015 4:22 PM
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> I think you might need to turn codegen on also in order for the unsafe
> stuff to work.
>
>
>
>
>
> On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
> Hi Reynold,
>
> Thank you for suggestion. This code takes around 30 sec on my setup (5
> workers with 32GB). My issue is that I don't see the change in time if I
> unset the unsafe flags. Could you explain why it might happen?
>
> 20 авг. 2015 г., в 15:32, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> написал(а):
>
>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> wrote:
> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com<ma...@hp.com>> wrote:
> Dear Spark developers,
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
> https://github.com/apache/spark/pull/5725
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
> The following enables the new aggregation in spark-config:
> spark.sql.unsafe.enabled=true
> spark.unsafe.offHeap=true
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
> case class Counter(key: Int, value: Double)
> val size = 100000000
> val partitions = 5
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
> val df = sqlContext.createDataFrame(data)
> df.persist()
> df.count()
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> Best regards, Alexander
>
>
>

RE: Dataframe aggregation with Tungsten unsafe

Posted by "Ulanov, Alexander" <al...@hp.com>.
When I add the following option:
spark.sql.codegen      true

Spark crashed on the “df.count” with concurrentException (below). Are you sure that I need to set this flag to get unsafe? It looks like SQL flag, and I don’t use sql.


java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
         at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
         at org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
         at org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
         at org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
         at org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
         at org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
         at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
         at org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
         at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
         at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
         at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
         at org.apache.spark.scheduler.Task.run(Task.scala:70)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
         at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
         at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
         at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
         at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
         at org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
         at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
         at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
         at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
         at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
         at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
         at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
         at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
         at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
         at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
         at org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
         at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
         at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
         at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
         at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
         ... 28 more
Caused by: java.lang.ClassNotFoundException: Override
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at java.lang.Class.forName0(Native Method)
         at java.lang.Class.forName(Class.java:270)
         at org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
         at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
         at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
         ... 66 more
Caused by: java.lang.ClassNotFoundException: Override
         at java.lang.ClassLoader.findClass(ClassLoader.java:531)
         at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
         at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
         ... 73 more


From: Reynold Xin [mailto:rxin@databricks.com]
Sent: Thursday, August 20, 2015 4:22 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Dataframe aggregation with Tungsten unsafe

I think you might need to turn codegen on also in order for the unsafe stuff to work.


On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <al...@hp.com>> wrote:
Hi Reynold,

Thank you for suggestion. This code takes around 30 sec on my setup (5 workers with 32GB). My issue is that I don't see the change in time if I unset the unsafe flags. Could you explain why it might happen?

20 авг. 2015 г., в 15:32, Reynold Xin <rx...@databricks.com>>> написал(а):

 I didn't wait long enough earlier. Actually it did finish when I raised memory to 8g.

In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags), the query took 40s with 4G of mem.

In 1.4, it took 195s with 8G of mem.

This is not a scientific benchmark and I only ran it once.



On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rx...@databricks.com>>> wrote:
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate, rather than using the in-memory cache, which is super slow and we are thinking of removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <al...@hp.com>>> wrote:
Dear Spark developers,

I am trying to benchmark the new Dataframe aggregation implemented under the project Tungsten and released with Spark 1.4 (I am using the latest Spark from the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to allocate memory and in-place update. It was also presented on Spark Summit this Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true

I wrote a simple code that does aggregation of values by keys. However, the time needed to execute the code does not depend if the new aggregation is on or off. Could you suggest how can I observe the improvement that the aggregation provides? Could you write a code snippet that takes advantage of the new aggregation?

case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


Best regards, Alexander



Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
I think you might need to turn codegen on also in order for the unsafe
stuff to work.


On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <al...@hp.com>
wrote:

> Hi Reynold,
>
> Thank you for suggestion. This code takes around 30 sec on my setup (5
> workers with 32GB). My issue is that I don't see the change in time if I
> unset the unsafe flags. Could you explain why it might happen?
>
> 20 авг. 2015 г., в 15:32, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> написал(а):
>
>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rxin@databricks.com<mailto:
> rxin@databricks.com>> wrote:
> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com<ma...@hp.com>> wrote:
> Dear Spark developers,
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
> https://github.com/apache/spark/pull/5725
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
> The following enables the new aggregation in spark-config:
> spark.sql.unsafe.enabled=true
> spark.unsafe.offHeap=true
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
> case class Counter(key: Int, value: Double)
> val size = 100000000
> val partitions = 5
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
> val df = sqlContext.createDataFrame(data)
> df.persist()
> df.count()
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> Best regards, Alexander
>
>
>

Re: Dataframe aggregation with Tungsten unsafe

Posted by "Ulanov, Alexander" <al...@hp.com>.
Hi Reynold,

Thank you for suggestion. This code takes around 30 sec on my setup (5 workers with 32GB). My issue is that I don't see the change in time if I unset the unsafe flags. Could you explain why it might happen?

20 авг. 2015 г., в 15:32, Reynold Xin <rx...@databricks.com>> написал(а):

 I didn't wait long enough earlier. Actually it did finish when I raised memory to 8g.

In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags), the query took 40s with 4G of mem.

In 1.4, it took 195s with 8G of mem.

This is not a scientific benchmark and I only ran it once.



On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rx...@databricks.com>> wrote:
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate, rather than using the in-memory cache, which is super slow and we are thinking of removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)



On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <al...@hp.com>> wrote:
Dear Spark developers,

I am trying to benchmark the new Dataframe aggregation implemented under the project Tungsten and released with Spark 1.4 (I am using the latest Spark from the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to allocate memory and in-place update. It was also presented on Spark Summit this Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true

I wrote a simple code that does aggregation of values by keys. However, the time needed to execute the code does not depend if the new aggregation is on or off. Could you suggest how can I observe the improvement that the aggregation provides? Could you write a code snippet that takes advantage of the new aggregation?

case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


Best regards, Alexander



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
 I didn't wait long enough earlier. Actually it did finish when I raised
memory to 8g.

In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
flags), the query took 40s with 4G of mem.

In 1.4, it took 195s with 8G of mem.

This is not a scientific benchmark and I only ran it once.



On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <rx...@databricks.com> wrote:

> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ulanov@hp.com> wrote:
>
>> Dear Spark developers,
>>
>>
>>
>> I am trying to benchmark the new Dataframe aggregation implemented under
>> the project Tungsten and released with Spark 1.4 (I am using the latest
>> Spark from the repo, i.e. 1.5):
>>
>> https://github.com/apache/spark/pull/5725
>>
>> It tells that the aggregation should be faster due to using the unsafe to
>> allocate memory and in-place update. It was also presented on Spark Summit
>> this Summer:
>>
>>
>> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
>>
>> The following enables the new aggregation in spark-config:
>>
>> spark.sql.unsafe.enabled=true
>>
>> spark.unsafe.offHeap=true
>>
>>
>>
>> I wrote a simple code that does aggregation of values by keys. However,
>> the time needed to execute the code does not depend if the new aggregation
>> is on or off. Could you suggest how can I observe the improvement that the
>> aggregation provides? Could you write a code snippet that takes advantage
>> of the new aggregation?
>>
>>
>>
>> case class Counter(key: Int, value: Double)
>>
>> val size = 100000000
>>
>> val partitions = 5
>>
>> val repetitions = 5
>>
>> val data = sc.parallelize(1 to size, partitions).map(x =>
>> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
>>
>> val df = sqlContext.createDataFrame(data)
>>
>> df.persist()
>>
>> df.count()
>>
>> val t = System.nanoTime()
>>
>> val res = df.groupBy("key").agg(sum("value"))
>>
>> res.count()
>>
>> println((System.nanoTime() - t) / 1e9)
>>
>>
>>
>>
>>
>> Best regards, Alexander
>>
>
>

Re: Dataframe aggregation with Tungsten unsafe

Posted by Reynold Xin <rx...@databricks.com>.
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
in 1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate,
rather than using the in-memory cache, which is super slow and we are
thinking of removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x =>
(util.Random.nextInt(size / repetitions),
util.Random.nextDouble)).toDF("key", "value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)



On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <al...@hp.com>
wrote:

> Dear Spark developers,
>
>
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
>
> https://github.com/apache/spark/pull/5725
>
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
>
> The following enables the new aggregation in spark-config:
>
> spark.sql.unsafe.enabled=true
>
> spark.unsafe.offHeap=true
>
>
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
>
>
> case class Counter(key: Int, value: Double)
>
> val size = 100000000
>
> val partitions = 5
>
> val repetitions = 5
>
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
>
> val df = sqlContext.createDataFrame(data)
>
> df.persist()
>
> df.count()
>
> val t = System.nanoTime()
>
> val res = df.groupBy("key").agg(sum("value"))
>
> res.count()
>
> println((System.nanoTime() - t) / 1e9)
>
>
>
>
>
> Best regards, Alexander
>