You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sea <26...@qq.com> on 2016/07/08 12:44:46 UTC

回复: Bug about reading parquet files

My spark version is 1.6.1.


== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
      +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
         +- Subquery dwd_native
            +- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native


== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
      +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
         +- Subquery dwd_native
            +- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native


== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate
      +- Project
         +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
            +- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native


== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#112L])
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])
   +- Limit 1
      +- ConvertToSafe
         +- TungstenAggregate(key=[], functions=[], output=[])
            +- TungstenExchange SinglePartition, None
               +- TungstenAggregate(key=[], functions=[], output=[])
                  +- Scan ParquetRelation: omega.dwd_native[] InputPaths: hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/





Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
  at scala.collection.mutable.HashTable$class.resize(HashTable.scala:247)
  at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:151)
  at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:163)
  at scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:41)
  at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:62)
  at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:59)
  at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:41)
  at scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:26)
  at scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:24)
  at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at scala.collection.mutable.GrowingBuilder.$plus$plus$eq(GrowingBuilder.scala:24)
  at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:48)
  at org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:910)
  at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
  at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
  at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)
  at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)
  at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
  at scala.Option.getOrElse(Option.scala:120)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)
  at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
  at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
  at org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:37)





------------------ 原始邮件 ------------------
发件人: "lian.cs.zju";<li...@gmail.com>;
发送时间: 2016年7月8日(星期五) 下午4:47
收件人: "Sea"<26...@qq.com>; 
抄送: "user"<us...@spark.apache.org>; 
主题: Re: Bug about reading parquet files



What's the Spark version? Could you please also attach result of explain(extended = true)?


On Fri, Jul 8, 2016 at 4:33 PM, Sea <26...@qq.com> wrote:
I have a problem reading parquet files.
sql:
select count(1) from   omega.dwd_native where year='2016' and month='07' and day='05' and hour='12' and appid='6';
The hive partition is (year,month,day,appid)


only two tasks, and it will list all directories in my table, not only /user/omega/events/v4/h/2016/07/07/12/appid=6
[Stage 1:>                                                          (0 + 0) / 2]


16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536

Re: 回复: Bug about reading parquet files

Posted by Cheng Lian <li...@gmail.com>.
According to our offline discussion, the target table consists of 1M+ 
small Parquet files (~12M by average). The OOM occurred at driver side 
while listing input files.

My theory is that the total size of all listed FileStatus objects is too 
large for the driver and caused the OOM.

Suggestions:

1. Merge those small Parquet files to reduce file number. Also, to be 
efficient, typically the size of a Parquet file should be at least 
larger than an HDFS block.
2. Try to increase driver size.

One possible improvement here in Spark is that we probably shouldn't 
list all input files of a partitioned table when the query only touches 
a fraction of all the partitions.

Cheng



On 7/8/16 8:44 PM, Sea wrote:
> My spark version is 1.6.1.
>
> == Parsed Logical Plan ==
> Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
> +- Limit 1
>    +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
>       +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && 
> (appid#5 = 6))
>          +- Subquery dwd_native
>             +- 
> Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] 
> ParquetRelation: omega.dwd_native
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
> +- Limit 1
>    +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
>       +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && 
> (appid#5 = 6))
>          +- Subquery dwd_native
>             +- 
> Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] 
> ParquetRelation: omega.dwd_native
>
> == Optimized Logical Plan ==
> Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
> +- Limit 1
>    +- Aggregate
>       +- Project
>          +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) 
> && (appid#5 = 6))
>             +- 
> Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] 
> ParquetRelation: omega.dwd_native
>
> == Physical Plan ==
> TungstenAggregate(key=[], 
> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#112L])
> +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])
>    +- Limit 1
>       +- ConvertToSafe
>          +- TungstenAggregate(key=[], functions=[], output=[])
>             +- TungstenExchange SinglePartition, None
>                +- TungstenAggregate(key=[], functions=[], output=[])
>                   +- Scan ParquetRelation: omega.dwd_native[] 
> InputPaths: 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0, 
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/
>
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>   at scala.collection.mutable.HashTable$class.resize(HashTable.scala:247)
>   at 
> scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:151)
>   at 
> scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:163)
>   at 
> scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:41)
>   at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:62)
>   at 
> scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:59)
>   at 
> scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:41)
>   at 
> scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:26)
>   at 
> scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:24)
>   at 
> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
>   at 
> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>   at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at 
> scala.collection.mutable.GrowingBuilder.$plus$plus$eq(GrowingBuilder.scala:24)
>   at 
> scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:48)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:910)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
>   at scala.Option.getOrElse(Option.scala:120)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
>   at 
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
>   at 
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:37)
>
>
> ------------------ \u052d\u02bc\u02bc ------------------
> *:* "lian.cs.zju";<li...@gmail.com>;
> *\u02b1:* 201678() 4:47
> *\u057c:* "Sea"<26...@qq.com>;
> *:* "user"<us...@spark.apache.org>;
> *:* Re: Bug about reading parquet files
>
> What's the Spark version? Could you please also attach result of 
> explain(extended = true)?
>
> On Fri, Jul 8, 2016 at 4:33 PM, Sea <261810726@qq.com 
> <ma...@qq.com>> wrote:
>
>     I have a problem reading parquet files.
>     sql:
>     select count(1) from   omega.dwd_native where year='2016' and
>     month='07' and day='05' and hour='12' and appid='6';
>     The hive partition is (year,month,day,appid)
>
>     only two tasks, and it will list all directories in my table, not
>     only /user/omega/events/v4/h/2016/07/07/12/appid=6
>     [Stage 1:>                  (0 + 0) / 2]
>
>     16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1
>     16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2
>
>     16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537
>     16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536
>
>