You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sea <26...@qq.com> on 2016/07/08 12:44:46 UTC
回复: Bug about reading parquet files
My spark version is 1.6.1.
== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
+- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
+- Subquery dwd_native
+- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native
== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
+- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
+- Subquery dwd_native
+- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native
== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
+- Aggregate
+- Project
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
+- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#112L])
+- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])
+- Limit 1
+- ConvertToSafe
+- TungstenAggregate(key=[], functions=[], output=[])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[], output=[])
+- Scan ParquetRelation: omega.dwd_native[] InputPaths: hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.HashTable$class.resize(HashTable.scala:247)
at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:151)
at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:163)
at scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:41)
at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:62)
at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:59)
at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:41)
at scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:26)
at scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:24)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.GrowingBuilder.$plus$plus$eq(GrowingBuilder.scala:24)
at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:48)
at org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:910)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)
at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)
at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)
at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
at org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:37)
------------------ 原始邮件 ------------------
发件人: "lian.cs.zju";<li...@gmail.com>;
发送时间: 2016年7月8日(星期五) 下午4:47
收件人: "Sea"<26...@qq.com>;
抄送: "user"<us...@spark.apache.org>;
主题: Re: Bug about reading parquet files
What's the Spark version? Could you please also attach result of explain(extended = true)?
On Fri, Jul 8, 2016 at 4:33 PM, Sea <26...@qq.com> wrote:
I have a problem reading parquet files.
sql:
select count(1) from omega.dwd_native where year='2016' and month='07' and day='05' and hour='12' and appid='6';
The hive partition is (year,month,day,appid)
only two tasks, and it will list all directories in my table, not only /user/omega/events/v4/h/2016/07/07/12/appid=6
[Stage 1:> (0 + 0) / 2]
16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536
Re: 回复: Bug about reading parquet files
Posted by Cheng Lian <li...@gmail.com>.
According to our offline discussion, the target table consists of 1M+
small Parquet files (~12M by average). The OOM occurred at driver side
while listing input files.
My theory is that the total size of all listed FileStatus objects is too
large for the driver and caused the OOM.
Suggestions:
1. Merge those small Parquet files to reduce file number. Also, to be
efficient, typically the size of a Parquet file should be at least
larger than an HDFS block.
2. Try to increase driver size.
One possible improvement here in Spark is that we probably shouldn't
list all input files of a partitioned table when the query only touches
a fraction of all the partitions.
Cheng
On 7/8/16 8:44 PM, Sea wrote:
> My spark version is 1.6.1.
>
> == Parsed Logical Plan ==
> Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
> +- Limit 1
> +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
> +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) &&
> (appid#5 = 6))
> +- Subquery dwd_native
> +-
> Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
> ParquetRelation: omega.dwd_native
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
> +- Limit 1
> +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
> +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) &&
> (appid#5 = 6))
> +- Subquery dwd_native
> +-
> Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
> ParquetRelation: omega.dwd_native
>
> == Optimized Logical Plan ==
> Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
> +- Limit 1
> +- Aggregate
> +- Project
> +- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700)
> && (appid#5 = 6))
> +-
> Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
> ParquetRelation: omega.dwd_native
>
> == Physical Plan ==
> TungstenAggregate(key=[],
> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#112L])
> +- TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])
> +- Limit 1
> +- ConvertToSafe
> +- TungstenAggregate(key=[], functions=[], output=[])
> +- TungstenExchange SinglePartition, None
> +- TungstenAggregate(key=[], functions=[], output=[])
> +- Scan ParquetRelation: omega.dwd_native[]
> InputPaths:
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0,
> hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/
>
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> at scala.collection.mutable.HashTable$class.resize(HashTable.scala:247)
> at
> scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:151)
> at
> scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:163)
> at
> scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:41)
> at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:62)
> at
> scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:59)
> at
> scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:41)
> at
> scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:26)
> at
> scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:24)
> at
> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
> at
> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.GrowingBuilder.$plus$plus$eq(GrowingBuilder.scala:24)
> at
> scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:48)
> at
> org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:910)
> at
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
> at
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
> at
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)
> at
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)
> at
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)
> at
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
> at
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
> at
> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:37)
>
>
> ------------------ \u052d\u02bc\u02bc ------------------
> *:* "lian.cs.zju";<li...@gmail.com>;
> *\u02b1:* 201678() 4:47
> *\u057c:* "Sea"<26...@qq.com>;
> *:* "user"<us...@spark.apache.org>;
> *:* Re: Bug about reading parquet files
>
> What's the Spark version? Could you please also attach result of
> explain(extended = true)?
>
> On Fri, Jul 8, 2016 at 4:33 PM, Sea <261810726@qq.com
> <ma...@qq.com>> wrote:
>
> I have a problem reading parquet files.
> sql:
> select count(1) from omega.dwd_native where year='2016' and
> month='07' and day='05' and hour='12' and appid='6';
> The hive partition is (year,month,day,appid)
>
> only two tasks, and it will list all directories in my table, not
> only /user/omega/events/v4/h/2016/07/07/12/appid=6
> [Stage 1:> (0 + 0) / 2]
>
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2
>
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537
> 16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536
>
>