You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by StanZhai <ma...@zhaishidan.cn> on 2015/09/19 13:43:28 UTC

[SparkSQL]How does spark handle a parquet file in parallel?

Hi all,

I'm using Spark (1.4.1) + Hive (0.13.1), I found that a large number of
network IO appeared when query a parquet table *with only one part file* use
SparkSQL. 

The SQL is: SELECT concat(year(fkbb5855f0), "-", month(fkbb5855f0), "-",
day(fkbb5855f0), " 00:00:00"),COUNT(fk919b1d80) FROM test WHERE fkbb5855f0
&gt;= '2015-08-02&nbsp;00:00:00' AND fkbb5855f0 &lt;
'2015-09-01&nbsp;00:00:00' AND fk418c5509 IN ('add_summary') AND (fkbb5855f0
!= '' AND fkbb5855f0 is not NULL) GROUP BY year(fkbb5855f0),
month(fkbb5855f0), day(fkbb5855f0)

The SQL's query explain is:

== Parsed Logical Plan ==
'Limit 10000
&nbsp;'Aggregate ['year('fkbb5855f0),'month('fkbb5855f0),'day('fkbb5855f0)],
['concat('year('fkbb5855f0),-,'month('fkbb5855f0),-,'day('fkbb5855f0),
00:00:00) AS _c0#14,COUNT('fk919b1d80) AS _c1#15]
&nbsp; 'Filter (((('fkbb5855f0 &gt;= 2015-08-02 00:00:00) &amp;&amp;
('fkbb5855f0 &lt; 2015-09-01 00:00:00)) &amp;&amp; 'fk418c5509 IN
(add_summary)) &amp;&amp; (NOT ('fkbb5855f0 = ) &amp;&amp; IS NOT NULL
'fkbb5855f0))
&nbsp; &nbsp;'UnresolvedRelation [test], None

== Analyzed Logical Plan ==
_c0: string, _c1: bigint
Limit 10000
&nbsp;Aggregate
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36),
00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L]
&nbsp; Filter ((((fkbb5855f0#36 &gt;= 2015-08-02 00:00:00) &amp;&amp;
(fkbb5855f0#36 &lt; 2015-09-01 00:00:00)) &amp;&amp; fk418c5509#35 IN
(add_summary)) &amp;&amp; (NOT (fkbb5855f0#36 = ) &amp;&amp; IS NOT NULL
fkbb5855f0#36))
&nbsp; &nbsp;Subquery test
&nbsp; &nbsp;
Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36]
org.apache.spark.sql.parquet.ParquetRelation2@5a271032

== Optimized Logical Plan ==
Limit 10000
&nbsp;Aggregate
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36),
00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L]
&nbsp; Project [fkbb5855f0#36,fk919b1d80#34]
&nbsp; &nbsp;Filter ((((fkbb5855f0#36 &gt;= 2015-08-02 00:00:00) &amp;&amp;
(fkbb5855f0#36 &lt; 2015-09-01 00:00:00)) &amp;&amp; fk418c5509#35 INSET
(add_summary)) &amp;&amp; (NOT (fkbb5855f0#36 = ) &amp;&amp; IS NOT NULL
fkbb5855f0#36))
&nbsp; &nbsp;
Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36]
org.apache.spark.sql.parquet.ParquetRelation2@5a271032

== Physical Plan ==
Limit 10000
&nbsp;Aggregate false, [PartialGroup#42,PartialGroup#43,PartialGroup#44],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(PartialGroup#42,-,PartialGroup#43,-,PartialGroup#44,
00:00:00) AS _c0#14,Coalesce(SUM(PartialCount#41L),0) AS _c1#15L]
&nbsp; Exchange (HashPartitioning 200)
&nbsp; &nbsp;Aggregate true,
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36) AS
PartialGroup#42,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36)
AS
PartialGroup#43,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)
AS PartialGroup#44,COUNT(fk919b1d80#34) AS PartialCount#41L]
&nbsp; &nbsp; Project [fkbb5855f0#36,fk919b1d80#34]
&nbsp; &nbsp; &nbsp;Filter (((((fkbb5855f0#36 &gt;= 2015-08-02 00:00:00)
&amp;&amp; (fkbb5855f0#36 &lt; 2015-09-01 00:00:00)) &amp;&amp;
fk418c5509#35 INSET (add_summary)) &amp;&amp; NOT (fkbb5855f0#36 = ))
&amp;&amp; IS NOT NULL fkbb5855f0#36)
&nbsp; &nbsp; &nbsp; PhysicalRDD
[fkbb5855f0#36,fk919b1d80#34,fk418c5509#35], MapPartitionsRDD[5] at

Code Generation: false
== RDD ==

The size of the `test` table is 3.3GB, I have 5 nodes in the Hadoop cluster,
and Spark use the same cluster. There are 3 replications of test table and
the block size is 64MB. 

The task count of the first stage is 54 when SparkSQL execute the SQL, the
Locality Level of all task is NODE_LOCAL.&nbsp;I use dstat monitoring a node
of the cluster, there are a large number of network IO:

----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system--
usr sys idl wai hiq siq| read &nbsp;writ| recv &nbsp;send| &nbsp;in &nbsp;
out | int &nbsp; csw
&nbsp; 1 &nbsp; 0 &nbsp;99 &nbsp; 0 &nbsp; 0 &nbsp; 0| 107k &nbsp;389k|
&nbsp; 0 &nbsp; &nbsp; 0 | 193B 1097B|2408 &nbsp;5443
&nbsp; 0 &nbsp; 0 100 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp; 0 &nbsp; &nbsp; 0
|5709B 3285B| &nbsp; 0 &nbsp; &nbsp; 0 |1921 &nbsp;4282
&nbsp; 0 &nbsp; 0 100 &nbsp; 0 &nbsp; 0 &nbsp; 0|1936k &nbsp; &nbsp;0 |3761B
1251B| &nbsp; 0 &nbsp; &nbsp; 0 |1907 &nbsp;4197
&nbsp; 0 &nbsp; 0 100 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp; 0 &nbsp; 584k|3399B
1539B| &nbsp; 0 &nbsp; &nbsp; 0 |1903 &nbsp;4338
&nbsp; 0 &nbsp; 0 &nbsp;99 &nbsp; 0 &nbsp; 0 &nbsp; 0|1936k &nbsp; &nbsp;0
|4332B 1447B| &nbsp; 0 &nbsp; &nbsp; 0 |2070 &nbsp;4448
&nbsp; 4 &nbsp; 1 &nbsp;93 &nbsp; 3 &nbsp; 0 &nbsp; 0| &nbsp;16M &nbsp;
&nbsp;0 |5117B 1439B| &nbsp; 0 &nbsp; &nbsp; 0 |9177 &nbsp; &nbsp;11k
&nbsp; 1 &nbsp; 1 &nbsp;77 &nbsp;21 &nbsp; 0 &nbsp; 0| 215M &nbsp; &nbsp;0 |
921k &nbsp; 68M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;21k &nbsp; 10k
&nbsp; 6 &nbsp; 1 &nbsp;79 &nbsp;14 &nbsp; 0 &nbsp; 0| 175M &nbsp; &nbsp;0 |
327k &nbsp; 19M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;10k 8207
&nbsp;32 &nbsp; 1 &nbsp;68 &nbsp; 0 &nbsp; 0 &nbsp; 0|3308k &nbsp; &nbsp;0 |
237k &nbsp; 14M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;16k 6536
&nbsp;22 &nbsp; 0 &nbsp;78 &nbsp; 0 &nbsp; 0 &nbsp; 0|2048k &nbsp; &nbsp;0 |
&nbsp;98k 5733k| &nbsp; 0 &nbsp; &nbsp; 0 |9190 &nbsp;5823
&nbsp;30 &nbsp; 0 &nbsp;69 &nbsp; 0 &nbsp; 0 &nbsp; 0|8080k 8192B| 175k
&nbsp; 11M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;18k 6950
&nbsp;23 &nbsp; 0 &nbsp;77 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp;18M &nbsp;
&nbsp;0 | 727k &nbsp; 52M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;25k 8648
&nbsp;22 &nbsp; 0 &nbsp;78 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp;28M &nbsp;
&nbsp;0 | 920k &nbsp; 96M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;26k &nbsp; 11k
&nbsp;22 &nbsp; 0 &nbsp;78 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp;31M &nbsp;
&nbsp;0 |1003k &nbsp;114M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;25k &nbsp; 10k
&nbsp;22 &nbsp; 0 &nbsp;78 &nbsp; 0 &nbsp; 0 &nbsp; 0|9372k &nbsp; &nbsp;0 |
487k &nbsp; 49M| &nbsp; 0 &nbsp; &nbsp; 0 |9935 &nbsp;5599
&nbsp;18 &nbsp; 1 &nbsp;81 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp; 0 &nbsp; 125k|
&nbsp;47k 2027k| &nbsp; 0 &nbsp; &nbsp; 0 |8820 &nbsp;5358
&nbsp; 4 &nbsp; 1 &nbsp;95 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp;28M &nbsp;450k|
289k &nbsp; 23M| &nbsp; 0 &nbsp; &nbsp; 0 | &nbsp;16k 7992
&nbsp; 0 &nbsp; 0 &nbsp;99 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp;10M &nbsp;
&nbsp;0 | 446k &nbsp; 42M| &nbsp; 0 &nbsp; &nbsp; 0 |3765 &nbsp;5262
&nbsp; 0 &nbsp; 0 100 &nbsp; 0 &nbsp; 0 &nbsp; 0|1944k &nbsp; &nbsp;0 |8540B
1364k| &nbsp; 0 &nbsp; &nbsp; 0 |1943 &nbsp;4378
&nbsp; 0 &nbsp; 0 &nbsp;99 &nbsp; 0 &nbsp; 0 &nbsp; 0| &nbsp; 0 &nbsp; 426k|
&nbsp;11k 2469B| &nbsp; 0 &nbsp; &nbsp; 0 |2008 &nbsp;4476
&nbsp; 1 &nbsp; 0 &nbsp;99 &nbsp; 0 &nbsp; 0 &nbsp; 0|1852k &nbsp;368k|
&nbsp;16k 1687B| &nbsp; 0 &nbsp; &nbsp; 0 |2111 &nbsp;4509

But the sql's result size is only 3KB, the network IO confused me. There are
also something confused me too, I found *half of the task's `Input
Size/Records` is `64.0 MB (hadoop) / 0`. *&nbsp;I think Spark dispatch jobs
in a wrong way.

I split the `test` table into different count of part files to valid my
thought. The data is:

part files, single file length,query els,network IO
1, 3.34G, 12s, ~200MB/node
5, 800MB, 10s,~100MB/node
64, 74MB, 8s, ~50MB/node
74, 64MB, 7s, only a little
150, 32MB, 6s, only a little
297,20MB, 5s, &nbsp;only a little
I also set the replication of `test` table to 5, I found no network IO
appeared, but the query cost 11s! So, I think the network IO should be spark
fetching data from a remote node when a parquet file large than the block
size of Hadoop (64MB).

I want to know:
1. How spark dispatch jobs when a parquet file only have one part file which
large than the block size of Hadoop?
2. In which condition task's `Input Size/Records` is `64.0 MB (hadoop) / 0`
or `5.2 KB (hadoop) / 0`&nbsp;or `0.0 B (hadoop) / 50`?

Any ideas? 
Thanks




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SparkSQL-How-does-spark-handle-a-parquet-file-in-parallel-tp14210.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.