You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jerry Lam <ch...@gmail.com> on 2015/10/25 02:35:33 UTC

[Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Hi Spark users and developers,

Does anyone encounter any issue when a spark SQL job produces a lot of
files (over 1 millions), the job hangs on the refresh method? I'm using
spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
but the driver is doing something very intensively (it uses all the cpus).
Does it mean Spark SQL cannot be used to produce over 1 million files in a
single job?

Thread 528: (state = BLOCKED)
 - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
 - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
(Compiled frame)
 - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
line=114 (Compiled frame)
 - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
line=415 (Compiled frame)
 - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
(Compiled frame)
 - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
 -
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
@bci=4, line=447 (Compiled frame)
 -
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
@bci=5, line=447 (Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
@bci=9, line=244 (Compiled frame)
 - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
@bci=2, line=244 (Compiled frame)
 -
scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
scala.Function1) @bci=22, line=33 (Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2,
line=108 (Compiled frame)
 -
scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
(Compiled frame)
 - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
 -
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
@bci=279, line=447 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
@bci=8, line=453 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
@bci=26, line=465 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
@bci=12, line=463 (Interpreted frame)
 - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540
(Interpreted frame)
 -
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
@bci=1, line=204 (Interpreted frame)
 -
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
@bci=392, line=152 (Interpreted frame)
 -
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
 -
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
 -
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
line=56 (Interpreted frame)
 -
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
@bci=718, line=108 (Interpreted frame)
 -
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
@bci=20, line=57 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
@bci=15, line=57 (Interpreted frame)
 - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
line=69 (Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
@bci=11, line=140 (Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
@bci=1, line=138 (Interpreted frame)
 -
org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
(Interpreted frame)
 - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138
(Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
@bci=21, line=933 (Interpreted frame)
 - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933
(Interpreted frame)
 -
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
line=197 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
(Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24,
line=137 (Interpreted frame)
 - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8,
line=304 (Interpreted frame)

Best Regards,

Jerry

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Koert Kuipers <ko...@tresata.com>.
it seems HadoopFsRelation keeps track of all part files (instead of just
the data directories). i believe this has something to do with parquet
footers but i didnt bother to look more into it. but yet the result is that
driver side it:
1) tries to keep track of all part files in a Map[Path, FileStatus]
2) it also tries to serialize the paths to all part files (instead of just
the data directories) in the Hadoop JobConf object (or create a JobConf per
part file in case of spark-avro)

i agree this approach is not scalable... i ran into it myself with
spark-avro where a job simply never gets started on a large number of part
files. i am still trying to understand better why all the part files need
to be tracked driver side but i am pretty sure i plan to remove this in our
inhouse spark version.

i also noticed code that actually assumes the schema for every part file
can be different (even within the same partition, which seems unlikely,
except if you use insert i guess), and the code tries to reconcile the
schema between all part files... i also do not think this is scalable.

sorry this became a bit of a rant


On Mon, Oct 26, 2015 at 9:56 AM, Jerry Lam <ch...@gmail.com> wrote:

> Hi Fengdong,
>
> Why it needs more memory at the driver side when there are many
> partitions? It seems the implementation can only support use cases for a
> dozen of partition when it is over 100, it fails apart. It is also quite
> slow to initialize the loading of partition tables when the number of
> partition is over 100.
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 26 Oct, 2015, at 2:50 am, Fengdong Yu <fe...@everstring.com> wrote:
>
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
>
>
>
>
>
> On Oct 26, 2015, at 10:58 AM, Jerry Lam <ch...@gmail.com> wrote:
>
> Hi guys,
>
> I mentioned that the partitions are generated so I tried to read the
> partition data from it. The driver is OOM after few minutes. The stack
> trace is below. It looks very similar to the the jstack above (note on the
> refresh method). Thanks!
>
> Name: java.lang.OutOfMemoryError
> Message: GC overhead limit exceeded
> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> java.lang.StringBuilder.append(StringBuilder.java:132)
> org.apache.hadoop.fs.Path.toString(Path.java:384)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
> org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
>
>
> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <ch...@gmail.com> wrote:
>
>> Hi Josh,
>>
>> No I don't have speculation enabled. The driver took about few hours
>> until it was OOM. Interestingly, all partitions are generated successfully
>> (_SUCCESS file is written in the output directory). Is there a reason why
>> the driver needs so much memory? The jstack revealed that it called refresh
>> some file statuses. Is there a way to avoid OutputCommitCoordinator to
>> use so much memory?
>>
>> Ultimately, I choose to use partitions because most of the queries I have
>> will execute based the partition field. For example, "SELECT events from
>> customer where customer_id = 1234". If the partition is based on
>> customer_id, all events for a customer can be easily retrieved without
>> filtering the entire dataset which is much more efficient (I hope).
>> However, I notice that the implementation of the partition logic does not
>> seem to allow this type of use cases without using a lot of memory which is
>> a bit odd in my opinion. Any help will be greatly appreciated.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <ro...@gmail.com> wrote:
>>
>>> Hi Jerry,
>>>
>>> Do you have speculation enabled? A write which produces one million
>>> files / output partitions might be using tons of driver memory via the
>>> OutputCommitCoordinator's bookkeeping data structures.
>>>
>>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <ch...@gmail.com> wrote:
>>>
>>>> Hi spark guys,
>>>>
>>>> I think I hit the same issue SPARK-8890
>>>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>>>> resolved. However it is not. I have over a million output directories for 1
>>>> single column in partitionBy. Not sure if this is a regression issue? Do I
>>>> need to set some parameters to make it more memory efficient?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <ch...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>>>> The method was called when saving the dataframe in parquet format. Also I'm
>>>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>>>> the output folder.
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Jerry
>>>>>
>>>>>
>>>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <ch...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Spark users and developers,
>>>>>>
>>>>>> Does anyone encounter any issue when a spark SQL job produces a lot
>>>>>> of files (over 1 millions), the job hangs on the refresh method? I'm using
>>>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>>>>> but the driver is doing something very intensively (it uses all the cpus).
>>>>>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>>>>>> single job?
>>>>>>
>>>>>> Thread 528: (state = BLOCKED)
>>>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>>>> frame)
>>>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>>>> line=130 (Compiled frame)
>>>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int)
>>>>>> @bci=12, line=114 (Compiled frame)
>>>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>>>> line=415 (Compiled frame)
>>>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>>>> (Compiled frame)
>>>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>>>> frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>>>> @bci=4, line=447 (Compiled frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>>>> @bci=5, line=447 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>>> @bci=9, line=244 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>>> @bci=2, line=244 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>>>> @bci=2, line=108 (Compiled frame)
>>>>>>  -
>>>>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>>>>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>>>>>> (Compiled frame)
>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>>>>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>>>>> @bci=279, line=447 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>>>>>> @bci=8, line=453 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org
>>>>>> <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>>>>> @bci=26, line=465 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org
>>>>>> <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>>>>> @bci=12, line=463 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>>>>>> line=540 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>>>>> @bci=1, line=204 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>>>>> @bci=392, line=152 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>>>>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>>>>>> line=56 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>>>>> @bci=718, line=108 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>>>>> @bci=20, line=57 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>>>>>> @bci=15, line=57 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute()
>>>>>> @bci=12, line=69 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>>>> @bci=11, line=140 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1,
>>>>>> line=138 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>>>>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>>>>>> (Interpreted frame)
>>>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189,
>>>>>> line=138 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>>>>>> @bci=21, line=933 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>>>>>> line=933 (Interpreted frame)
>>>>>>  -
>>>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>>>>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>>>>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>>>>>> line=197 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>>>>>> (Interpreted frame)
>>>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String)
>>>>>> @bci=24, line=137 (Interpreted frame)
>>>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String)
>>>>>> @bci=8, line=304 (Interpreted frame)
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Jerry
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Jerry Lam <ch...@gmail.com>.
Hi Fengdong,

Why it needs more memory at the driver side when there are many partitions? It seems the implementation can only support use cases for a dozen of partition when it is over 100, it fails apart. It is also quite slow to initialize the loading of partition tables when the number of partition is over 100. 

Best Regards,

Jerry

Sent from my iPhone

> On 26 Oct, 2015, at 2:50 am, Fengdong Yu <fe...@everstring.com> wrote:
> 
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
> 
> 
> 
> 
> 
>> On Oct 26, 2015, at 10:58 AM, Jerry Lam <ch...@gmail.com> wrote:
>> 
>> Hi guys,
>> 
>> I mentioned that the partitions are generated so I tried to read the partition data from it. The driver is OOM after few minutes. The stack trace is below. It looks very similar to the the jstack above (note on the refresh method). Thanks!
>> 
>> Name: java.lang.OutOfMemoryError
>> Message: GC overhead limit exceeded
>> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
>> java.lang.StringBuilder.append(StringBuilder.java:132)
>> org.apache.hadoop.fs.Path.toString(Path.java:384)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
>> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> scala.Option.getOrElse(Option.scala:120)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
>> org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
>> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
>> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
>> 
>>> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <ch...@gmail.com> wrote:
>>> Hi Josh,
>>> 
>>> No I don't have speculation enabled. The driver took about few hours until it was OOM. Interestingly, all partitions are generated successfully (_SUCCESS file is written in the output directory). Is there a reason why the driver needs so much memory? The jstack revealed that it called refresh some file statuses. Is there a way to avoid OutputCommitCoordinator to use so much memory? 
>>> 
>>> Ultimately, I choose to use partitions because most of the queries I have will execute based the partition field. For example, "SELECT events from customer where customer_id = 1234". If the partition is based on customer_id, all events for a customer can be easily retrieved without filtering the entire dataset which is much more efficient (I hope). However, I notice that the implementation of the partition logic does not seem to allow this type of use cases without using a lot of memory which is a bit odd in my opinion. Any help will be greatly appreciated.
>>> 
>>> Best Regards,
>>> 
>>> Jerry
>>> 
>>> 
>>> 
>>>> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <ro...@gmail.com> wrote:
>>>> Hi Jerry,
>>>> 
>>>> Do you have speculation enabled? A write which produces one million files / output partitions might be using tons of driver memory via the OutputCommitCoordinator's bookkeeping data structures.
>>>> 
>>>>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <ch...@gmail.com> wrote:
>>>>> Hi spark guys,
>>>>> 
>>>>> I think I hit the same issue SPARK-8890 https://issues.apache.org/jira/browse/SPARK-8890. It is marked as resolved. However it is not. I have over a million output directories for 1 single column in partitionBy. Not sure if this is a regression issue? Do I need to set some parameters to make it more memory efficient?
>>>>> 
>>>>> Best Regards,
>>>>> 
>>>>> Jerry
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <ch...@gmail.com> wrote:
>>>>>> Hi guys,
>>>>>> 
>>>>>> After waiting for a day, it actually causes OOM on the spark driver. I configure the driver to have 6GB. Note that I didn't call refresh myself. The method was called when saving the dataframe in parquet format. Also I'm using partitionBy() on the DataFrameWriter to generate over 1 million files. Not sure why it OOM the driver after the job is marked _SUCCESS in the output folder. 
>>>>>> 
>>>>>> Best Regards,
>>>>>> 
>>>>>> Jerry
>>>>>> 
>>>>>> 
>>>>>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <ch...@gmail.com> wrote:
>>>>>>> Hi Spark users and developers,
>>>>>>> 
>>>>>>> Does anyone encounter any issue when a spark SQL job produces a lot of files (over 1 millions), the job hangs on the refresh method? I'm using spark 1.5.1. Below is the stack trace. I saw the parquet files are produced but the driver is doing something very intensively (it uses all the cpus). Does it mean Spark SQL cannot be used to produce over 1 million files in a single job?
>>>>>>> 
>>>>>>> Thread 528: (state = BLOCKED)
>>>>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>>>>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 (Compiled frame)
>>>>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, line=114 (Compiled frame)
>>>>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 (Compiled frame)
>>>>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 (Compiled frame)
>>>>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus) @bci=4, line=447 (Compiled frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object) @bci=5, line=447 (Compiled frame)
>>>>>>>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=9, line=244 (Compiled frame)
>>>>>>>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=2, line=244 (Compiled frame)
>>>>>>>  - scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, line=108 (Compiled frame)
>>>>>>>  - scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244 (Compiled frame)
>>>>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[]) @bci=279, line=447 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() @bci=8, line=453 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute() @bci=26, line=465 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache() @bci=12, line=463 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() @bci=1, line=204 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp() @bci=392, line=152 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply() @bci=1, line=108 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply() @bci=1, line=108 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext, org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, line=56 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext) @bci=718, line=108 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() @bci=20, line=57 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, line=57 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, line=69 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=11, line=140 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1, line=138 (Interpreted frame)
>>>>>>>  - org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext, java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, line=933 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext, java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293, line=197 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, line=137 (Interpreted frame)
>>>>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, line=304 (Interpreted frame)
>>>>>>> 
>>>>>>> Best Regards,
>>>>>>> 
>>>>>>> Jerry
> 

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Fengdong Yu <fe...@everstring.com>.
How many partitions you generated?
if Millions generated, then there is a huge memory consumed.





> On Oct 26, 2015, at 10:58 AM, Jerry Lam <ch...@gmail.com> wrote:
> 
> Hi guys,
> 
> I mentioned that the partitions are generated so I tried to read the partition data from it. The driver is OOM after few minutes. The stack trace is below. It looks very similar to the the jstack above (note on the refresh method). Thanks!
> 
> Name: java.lang.OutOfMemoryError
> Message: GC overhead limit exceeded
> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> java.lang.StringBuilder.append(StringBuilder.java:132)
> org.apache.hadoop.fs.Path.toString(Path.java:384)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
> org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
> org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
> org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
> 
> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chilinglam@gmail.com <ma...@gmail.com>> wrote:
> Hi Josh,
> 
> No I don't have speculation enabled. The driver took about few hours until it was OOM. Interestingly, all partitions are generated successfully (_SUCCESS file is written in the output directory). Is there a reason why the driver needs so much memory? The jstack revealed that it called refresh some file statuses. Is there a way to avoid OutputCommitCoordinator to use so much memory? 
> 
> Ultimately, I choose to use partitions because most of the queries I have will execute based the partition field. For example, "SELECT events from customer where customer_id = 1234". If the partition is based on customer_id, all events for a customer can be easily retrieved without filtering the entire dataset which is much more efficient (I hope). However, I notice that the implementation of the partition logic does not seem to allow this type of use cases without using a lot of memory which is a bit odd in my opinion. Any help will be greatly appreciated.
> 
> Best Regards,
> 
> Jerry
> 
> 
> 
> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenville@gmail.com <ma...@gmail.com>> wrote:
> Hi Jerry,
> 
> Do you have speculation enabled? A write which produces one million files / output partitions might be using tons of driver memory via the OutputCommitCoordinator's bookkeeping data structures.
> 
> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chilinglam@gmail.com <ma...@gmail.com>> wrote:
> Hi spark guys,
> 
> I think I hit the same issue SPARK-8890 https://issues.apache.org/jira/browse/SPARK-8890 <https://issues.apache.org/jira/browse/SPARK-8890>. It is marked as resolved. However it is not. I have over a million output directories for 1 single column in partitionBy. Not sure if this is a regression issue? Do I need to set some parameters to make it more memory efficient?
> 
> Best Regards,
> 
> Jerry
> 
> 
> 
> 
> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chilinglam@gmail.com <ma...@gmail.com>> wrote:
> Hi guys,
> 
> After waiting for a day, it actually causes OOM on the spark driver. I configure the driver to have 6GB. Note that I didn't call refresh myself. The method was called when saving the dataframe in parquet format. Also I'm using partitionBy() on the DataFrameWriter to generate over 1 million files. Not sure why it OOM the driver after the job is marked _SUCCESS in the output folder. 
> 
> Best Regards,
> 
> Jerry
> 
> 
> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chilinglam@gmail.com <ma...@gmail.com>> wrote:
> Hi Spark users and developers,
> 
> Does anyone encounter any issue when a spark SQL job produces a lot of files (over 1 millions), the job hangs on the refresh method? I'm using spark 1.5.1. Below is the stack trace. I saw the parquet files are produced but the driver is doing something very intensively (it uses all the cpus). Does it mean Spark SQL cannot be used to produce over 1 million files in a single job?
> 
> Thread 528: (state = BLOCKED)
>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 (Compiled frame)
>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, line=114 (Compiled frame)
>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 (Compiled frame)
>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 (Compiled frame)
>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus) @bci=4, line=447 (Compiled frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object) @bci=5, line=447 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=9, line=244 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=2, line=244 (Compiled frame)
>  - scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, scala.Function1) @bci=22, line=33 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, line=108 (Compiled frame)
>  - scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[]) @bci=279, line=447 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() @bci=8, line=453 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute() @bci=26, line=465 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache() @bci=12, line=463 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() @bci=1, line=204 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp() @bci=392, line=152 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply() @bci=1, line=108 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply() @bci=1, line=108 (Interpreted frame)
>  - org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext, org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, line=56 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext) @bci=718, line=108 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() @bci=20, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, line=69 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=11, line=140 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1, line=138 (Interpreted frame)
>  - org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext, java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 (Interpreted frame)
>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, line=933 (Interpreted frame)
>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext, java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293, line=197 (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, line=137 (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, line=304 (Interpreted frame)
> 
> Best Regards,
> 
> Jerry
> 
> 
> 
> 
> 
> 
> 


Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Jerry Lam <ch...@gmail.com>.
Hi guys,

I mentioned that the partitions are generated so I tried to read the
partition data from it. The driver is OOM after few minutes. The stack
trace is below. It looks very similar to the the jstack above (note on the
refresh method). Thanks!

Name: java.lang.OutOfMemoryError
Message: GC overhead limit exceeded
StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
java.lang.StringBuilder.append(StringBuilder.java:132)
org.apache.hadoop.fs.Path.toString(Path.java:384)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)


On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <ch...@gmail.com> wrote:

> Hi Josh,
>
> No I don't have speculation enabled. The driver took about few hours until
> it was OOM. Interestingly, all partitions are generated successfully
> (_SUCCESS file is written in the output directory). Is there a reason why
> the driver needs so much memory? The jstack revealed that it called refresh
> some file statuses. Is there a way to avoid OutputCommitCoordinator to
> use so much memory?
>
> Ultimately, I choose to use partitions because most of the queries I have
> will execute based the partition field. For example, "SELECT events from
> customer where customer_id = 1234". If the partition is based on
> customer_id, all events for a customer can be easily retrieved without
> filtering the entire dataset which is much more efficient (I hope).
> However, I notice that the implementation of the partition logic does not
> seem to allow this type of use cases without using a lot of memory which is
> a bit odd in my opinion. Any help will be greatly appreciated.
>
> Best Regards,
>
> Jerry
>
>
>
> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <ro...@gmail.com> wrote:
>
>> Hi Jerry,
>>
>> Do you have speculation enabled? A write which produces one million files
>> / output partitions might be using tons of driver memory via the
>> OutputCommitCoordinator's bookkeeping data structures.
>>
>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <ch...@gmail.com> wrote:
>>
>>> Hi spark guys,
>>>
>>> I think I hit the same issue SPARK-8890
>>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>>> resolved. However it is not. I have over a million output directories for 1
>>> single column in partitionBy. Not sure if this is a regression issue? Do I
>>> need to set some parameters to make it more memory efficient?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>>
>>>
>>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <ch...@gmail.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>>> The method was called when saving the dataframe in parquet format. Also I'm
>>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>>> the output folder.
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <ch...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Spark users and developers,
>>>>>
>>>>> Does anyone encounter any issue when a spark SQL job produces a lot of
>>>>> files (over 1 millions), the job hangs on the refresh method? I'm using
>>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>>>> but the driver is doing something very intensively (it uses all the cpus).
>>>>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>>>>> single job?
>>>>>
>>>>> Thread 528: (state = BLOCKED)
>>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>>> frame)
>>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>>> line=130 (Compiled frame)
>>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int)
>>>>> @bci=12, line=114 (Compiled frame)
>>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>>> line=415 (Compiled frame)
>>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>>> (Compiled frame)
>>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>>> frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>>> @bci=4, line=447 (Compiled frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>>> @bci=5, line=447 (Compiled frame)
>>>>>  -
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>> @bci=9, line=244 (Compiled frame)
>>>>>  -
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>> @bci=2, line=244 (Compiled frame)
>>>>>  -
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>>> @bci=2, line=108 (Compiled frame)
>>>>>  -
>>>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>>>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>>>>> (Compiled frame)
>>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>>>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>>>> @bci=279, line=447 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>>>>> @bci=8, line=453 (Interpreted frame)
>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>>>> @bci=26, line=465 (Interpreted frame)
>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>>>> @bci=12, line=463 (Interpreted frame)
>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>>>>> line=540 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>>>> @bci=1, line=204 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>>>> @bci=392, line=152 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>>>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>>>>> line=56 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>>>> @bci=718, line=108 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>>>> @bci=20, line=57 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>>>>> @bci=15, line=57 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
>>>>> line=69 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>>> @bci=11, line=140 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>>> @bci=1, line=138 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>>>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>>>>> (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189,
>>>>> line=138 (Interpreted frame)
>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>>>>> @bci=21, line=933 (Interpreted frame)
>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>>>>> line=933 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>>>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>>>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>>>>> line=197 (Interpreted frame)
>>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>>>>> (Interpreted frame)
>>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String)
>>>>> @bci=24, line=137 (Interpreted frame)
>>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String)
>>>>> @bci=8, line=304 (Interpreted frame)
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Jerry
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Jerry Lam <ch...@gmail.com>.
Hi Josh,

No I don't have speculation enabled. The driver took about few hours until
it was OOM. Interestingly, all partitions are generated successfully
(_SUCCESS file is written in the output directory). Is there a reason why
the driver needs so much memory? The jstack revealed that it called refresh
some file statuses. Is there a way to avoid OutputCommitCoordinator to use
so much memory?

Ultimately, I choose to use partitions because most of the queries I have
will execute based the partition field. For example, "SELECT events from
customer where customer_id = 1234". If the partition is based on
customer_id, all events for a customer can be easily retrieved without
filtering the entire dataset which is much more efficient (I hope).
However, I notice that the implementation of the partition logic does not
seem to allow this type of use cases without using a lot of memory which is
a bit odd in my opinion. Any help will be greatly appreciated.

Best Regards,

Jerry



On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <ro...@gmail.com> wrote:

> Hi Jerry,
>
> Do you have speculation enabled? A write which produces one million files
> / output partitions might be using tons of driver memory via the
> OutputCommitCoordinator's bookkeeping data structures.
>
> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <ch...@gmail.com> wrote:
>
>> Hi spark guys,
>>
>> I think I hit the same issue SPARK-8890
>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>> resolved. However it is not. I have over a million output directories for 1
>> single column in partitionBy. Not sure if this is a regression issue? Do I
>> need to set some parameters to make it more memory efficient?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>>
>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <ch...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>> The method was called when saving the dataframe in parquet format. Also I'm
>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>> the output folder.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <ch...@gmail.com> wrote:
>>>
>>>> Hi Spark users and developers,
>>>>
>>>> Does anyone encounter any issue when a spark SQL job produces a lot of
>>>> files (over 1 millions), the job hangs on the refresh method? I'm using
>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>>> but the driver is doing something very intensively (it uses all the cpus).
>>>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>>>> single job?
>>>>
>>>> Thread 528: (state = BLOCKED)
>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>> frame)
>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>> line=130 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>>>> line=114 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>> line=415 (Compiled frame)
>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>> (Compiled frame)
>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>> frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>> @bci=4, line=447 (Compiled frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>> @bci=5, line=447 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=9, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=2, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>> @bci=2, line=108 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>>>> (Compiled frame)
>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>>> @bci=279, line=447 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>>>> @bci=8, line=453 (Interpreted frame)
>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>>> @bci=26, line=465 (Interpreted frame)
>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>>> @bci=12, line=463 (Interpreted frame)
>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>>>> line=540 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>>> @bci=1, line=204 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>>> @bci=392, line=152 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>> @bci=1, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>> @bci=1, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>>>> line=56 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>>> @bci=718, line=108 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>>> @bci=20, line=57 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>>>> @bci=15, line=57 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
>>>> line=69 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>> @bci=11, line=140 (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>> @bci=1, line=138 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>>>> (Interpreted frame)
>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189,
>>>> line=138 (Interpreted frame)
>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>>>> @bci=21, line=933 (Interpreted frame)
>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>>>> line=933 (Interpreted frame)
>>>>  -
>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>>>> line=197 (Interpreted frame)
>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>>>> (Interpreted frame)
>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24,
>>>> line=137 (Interpreted frame)
>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String)
>>>> @bci=8, line=304 (Interpreted frame)
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>
>>
>

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Josh Rosen <ro...@gmail.com>.
Hi Jerry,

Do you have speculation enabled? A write which produces one million files /
output partitions might be using tons of driver memory via the
OutputCommitCoordinator's bookkeeping data structures.

On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <ch...@gmail.com> wrote:

> Hi spark guys,
>
> I think I hit the same issue SPARK-8890
> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
> resolved. However it is not. I have over a million output directories for 1
> single column in partitionBy. Not sure if this is a regression issue? Do I
> need to set some parameters to make it more memory efficient?
>
> Best Regards,
>
> Jerry
>
>
>
>
> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <ch...@gmail.com> wrote:
>
>> Hi guys,
>>
>> After waiting for a day, it actually causes OOM on the spark driver. I
>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>> The method was called when saving the dataframe in parquet format. Also I'm
>> using partitionBy() on the DataFrameWriter to generate over 1 million
>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>> the output folder.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <ch...@gmail.com> wrote:
>>
>>> Hi Spark users and developers,
>>>
>>> Does anyone encounter any issue when a spark SQL job produces a lot of
>>> files (over 1 millions), the job hangs on the refresh method? I'm using
>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>> but the driver is doing something very intensively (it uses all the cpus).
>>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>>> single job?
>>>
>>> Thread 528: (state = BLOCKED)
>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>> frame)
>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
>>> (Compiled frame)
>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>>> line=114 (Compiled frame)
>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>> line=415 (Compiled frame)
>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>> (Compiled frame)
>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>> frame)
>>>  -
>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>> @bci=4, line=447 (Compiled frame)
>>>  -
>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>> @bci=5, line=447 (Compiled frame)
>>>  -
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>> @bci=9, line=244 (Compiled frame)
>>>  -
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>> @bci=2, line=244 (Compiled frame)
>>>  -
>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>> @bci=2, line=108 (Compiled frame)
>>>  -
>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>>> (Compiled frame)
>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>> @bci=279, line=447 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>>> @bci=8, line=453 (Interpreted frame)
>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>> @bci=26, line=465 (Interpreted frame)
>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>> @bci=12, line=463 (Interpreted frame)
>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>>> line=540 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>> @bci=1, line=204 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>> @bci=392, line=152 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>> @bci=1, line=108 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>> @bci=1, line=108 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>>> line=56 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>> @bci=718, line=108 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>> @bci=20, line=57 (Interpreted frame)
>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>>> @bci=15, line=57 (Interpreted frame)
>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
>>> line=69 (Interpreted frame)
>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>> @bci=11, line=140 (Interpreted frame)
>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>> @bci=1, line=138 (Interpreted frame)
>>>  -
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>>> (Interpreted frame)
>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138
>>> (Interpreted frame)
>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>>> @bci=21, line=933 (Interpreted frame)
>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>>> line=933 (Interpreted frame)
>>>  -
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>>> line=197 (Interpreted frame)
>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>>> (Interpreted frame)
>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24,
>>> line=137 (Interpreted frame)
>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String)
>>> @bci=8, line=304 (Interpreted frame)
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>>
>>
>

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Jerry Lam <ch...@gmail.com>.
Hi spark guys,

I think I hit the same issue SPARK-8890
https://issues.apache.org/jira/browse/SPARK-8890. It is marked as resolved.
However it is not. I have over a million output directories for 1 single
column in partitionBy. Not sure if this is a regression issue? Do I need to
set some parameters to make it more memory efficient?

Best Regards,

Jerry




On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <ch...@gmail.com> wrote:

> Hi guys,
>
> After waiting for a day, it actually causes OOM on the spark driver. I
> configure the driver to have 6GB. Note that I didn't call refresh myself.
> The method was called when saving the dataframe in parquet format. Also I'm
> using partitionBy() on the DataFrameWriter to generate over 1 million
> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
> the output folder.
>
> Best Regards,
>
> Jerry
>
>
> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <ch...@gmail.com> wrote:
>
>> Hi Spark users and developers,
>>
>> Does anyone encounter any issue when a spark SQL job produces a lot of
>> files (over 1 millions), the job hangs on the refresh method? I'm using
>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>> but the driver is doing something very intensively (it uses all the cpus).
>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>> single job?
>>
>> Thread 528: (state = BLOCKED)
>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
>> (Compiled frame)
>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>> line=114 (Compiled frame)
>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>> line=415 (Compiled frame)
>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>> (Compiled frame)
>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>> frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>> @bci=4, line=447 (Compiled frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>> @bci=5, line=447 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>> @bci=9, line=244 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>> @bci=2, line=244 (Compiled frame)
>>  -
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> @bci=2, line=108 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>> (Compiled frame)
>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>> @bci=279, line=447 (Interpreted frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>> @bci=8, line=453 (Interpreted frame)
>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>> @bci=26, line=465 (Interpreted frame)
>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>> @bci=12, line=463 (Interpreted frame)
>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>> line=540 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>> @bci=1, line=204 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>> @bci=392, line=152 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>> @bci=1, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>> @bci=1, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>> line=56 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>> @bci=718, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>> @bci=20, line=57 (Interpreted frame)
>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>> @bci=15, line=57 (Interpreted frame)
>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
>> line=69 (Interpreted frame)
>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>> @bci=11, line=140 (Interpreted frame)
>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>> @bci=1, line=138 (Interpreted frame)
>>  -
>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>> (Interpreted frame)
>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138
>> (Interpreted frame)
>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>> @bci=21, line=933 (Interpreted frame)
>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>> line=933 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>> line=197 (Interpreted frame)
>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>> (Interpreted frame)
>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24,
>> line=137 (Interpreted frame)
>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8,
>> line=304 (Interpreted frame)
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

Posted by Jerry Lam <ch...@gmail.com>.
Hi guys,

After waiting for a day, it actually causes OOM on the spark driver. I
configure the driver to have 6GB. Note that I didn't call refresh myself.
The method was called when saving the dataframe in parquet format. Also I'm
using partitionBy() on the DataFrameWriter to generate over 1 million
files. Not sure why it OOM the driver after the job is marked _SUCCESS in
the output folder.

Best Regards,

Jerry


On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <ch...@gmail.com> wrote:

> Hi Spark users and developers,
>
> Does anyone encounter any issue when a spark SQL job produces a lot of
> files (over 1 millions), the job hangs on the refresh method? I'm using
> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
> but the driver is doing something very intensively (it uses all the cpus).
> Does it mean Spark SQL cannot be used to produce over 1 million files in a
> single job?
>
> Thread 528: (state = BLOCKED)
>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
> (Compiled frame)
>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
> line=114 (Compiled frame)
>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
> line=415 (Compiled frame)
>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
> (Compiled frame)
>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
> @bci=4, line=447 (Compiled frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
> @bci=5, line=447 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
> @bci=9, line=244 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
> @bci=2, line=244 (Compiled frame)
>  -
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> scala.Function1) @bci=22, line=33 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> @bci=2, line=108 (Compiled frame)
>  -
> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
> (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
> @bci=279, line=447 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
> @bci=8, line=453 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
> @bci=26, line=465 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
> @bci=12, line=463 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
> line=540 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
> @bci=1, line=204 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
> @bci=392, line=152 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
> @bci=1, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
> @bci=1, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
> line=56 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
> @bci=718, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
> @bci=20, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
> @bci=15, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
> line=69 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
> @bci=11, line=140 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
> @bci=1, line=138 (Interpreted frame)
>  -
> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
> (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138
> (Interpreted frame)
>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
> @bci=21, line=933 (Interpreted frame)
>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
> line=933 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
> line=197 (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
> (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24,
> line=137 (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8,
> line=304 (Interpreted frame)
>
> Best Regards,
>
> Jerry
>
>
>