You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by StanZhai <ma...@zhaishidan.cn> on 2015/09/19 13:43:28 UTC
[SparkSQL]How does spark handle a parquet file in parallel?
Hi all,
I'm using Spark (1.4.1) + Hive (0.13.1), I found that a large number of
network IO appeared when query a parquet table *with only one part file* use
SparkSQL.
The SQL is: SELECT concat(year(fkbb5855f0), "-", month(fkbb5855f0), "-",
day(fkbb5855f0), " 00:00:00"),COUNT(fk919b1d80) FROM test WHERE fkbb5855f0
>= '2015-08-02 00:00:00' AND fkbb5855f0 <
'2015-09-01 00:00:00' AND fk418c5509 IN ('add_summary') AND (fkbb5855f0
!= '' AND fkbb5855f0 is not NULL) GROUP BY year(fkbb5855f0),
month(fkbb5855f0), day(fkbb5855f0)
The SQL's query explain is:
== Parsed Logical Plan ==
'Limit 10000
'Aggregate ['year('fkbb5855f0),'month('fkbb5855f0),'day('fkbb5855f0)],
['concat('year('fkbb5855f0),-,'month('fkbb5855f0),-,'day('fkbb5855f0),
00:00:00) AS _c0#14,COUNT('fk919b1d80) AS _c1#15]
'Filter (((('fkbb5855f0 >= 2015-08-02 00:00:00) &&
('fkbb5855f0 < 2015-09-01 00:00:00)) && 'fk418c5509 IN
(add_summary)) && (NOT ('fkbb5855f0 = ) && IS NOT NULL
'fkbb5855f0))
'UnresolvedRelation [test], None
== Analyzed Logical Plan ==
_c0: string, _c1: bigint
Limit 10000
Aggregate
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36),
00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L]
Filter ((((fkbb5855f0#36 >= 2015-08-02 00:00:00) &&
(fkbb5855f0#36 < 2015-09-01 00:00:00)) && fk418c5509#35 IN
(add_summary)) && (NOT (fkbb5855f0#36 = ) && IS NOT NULL
fkbb5855f0#36))
Subquery test
Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36]
org.apache.spark.sql.parquet.ParquetRelation2@5a271032
== Optimized Logical Plan ==
Limit 10000
Aggregate
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36),
00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L]
Project [fkbb5855f0#36,fk919b1d80#34]
Filter ((((fkbb5855f0#36 >= 2015-08-02 00:00:00) &&
(fkbb5855f0#36 < 2015-09-01 00:00:00)) && fk418c5509#35 INSET
(add_summary)) && (NOT (fkbb5855f0#36 = ) && IS NOT NULL
fkbb5855f0#36))
Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36]
org.apache.spark.sql.parquet.ParquetRelation2@5a271032
== Physical Plan ==
Limit 10000
Aggregate false, [PartialGroup#42,PartialGroup#43,PartialGroup#44],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(PartialGroup#42,-,PartialGroup#43,-,PartialGroup#44,
00:00:00) AS _c0#14,Coalesce(SUM(PartialCount#41L),0) AS _c1#15L]
Exchange (HashPartitioning 200)
Aggregate true,
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36) AS
PartialGroup#42,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36)
AS
PartialGroup#43,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)
AS PartialGroup#44,COUNT(fk919b1d80#34) AS PartialCount#41L]
Project [fkbb5855f0#36,fk919b1d80#34]
Filter (((((fkbb5855f0#36 >= 2015-08-02 00:00:00)
&& (fkbb5855f0#36 < 2015-09-01 00:00:00)) &&
fk418c5509#35 INSET (add_summary)) && NOT (fkbb5855f0#36 = ))
&& IS NOT NULL fkbb5855f0#36)
PhysicalRDD
[fkbb5855f0#36,fk919b1d80#34,fk418c5509#35], MapPartitionsRDD[5] at
Code Generation: false
== RDD ==
The size of the `test` table is 3.3GB, I have 5 nodes in the Hadoop cluster,
and Spark use the same cluster. There are 3 replications of test table and
the block size is 64MB.
The task count of the first stage is 54 when SparkSQL execute the SQL, the
Locality Level of all task is NODE_LOCAL. I use dstat monitoring a node
of the cluster, there are a large number of network IO:
----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system--
usr sys idl wai hiq siq| read writ| recv send| in
out | int csw
1 0 99 0 0 0| 107k 389k|
0 0 | 193B 1097B|2408 5443
0 0 100 0 0 0| 0 0
|5709B 3285B| 0 0 |1921 4282
0 0 100 0 0 0|1936k 0 |3761B
1251B| 0 0 |1907 4197
0 0 100 0 0 0| 0 584k|3399B
1539B| 0 0 |1903 4338
0 0 99 0 0 0|1936k 0
|4332B 1447B| 0 0 |2070 4448
4 1 93 3 0 0| 16M
0 |5117B 1439B| 0 0 |9177 11k
1 1 77 21 0 0| 215M 0 |
921k 68M| 0 0 | 21k 10k
6 1 79 14 0 0| 175M 0 |
327k 19M| 0 0 | 10k 8207
32 1 68 0 0 0|3308k 0 |
237k 14M| 0 0 | 16k 6536
22 0 78 0 0 0|2048k 0 |
98k 5733k| 0 0 |9190 5823
30 0 69 0 0 0|8080k 8192B| 175k
11M| 0 0 | 18k 6950
23 0 77 0 0 0| 18M
0 | 727k 52M| 0 0 | 25k 8648
22 0 78 0 0 0| 28M
0 | 920k 96M| 0 0 | 26k 11k
22 0 78 0 0 0| 31M
0 |1003k 114M| 0 0 | 25k 10k
22 0 78 0 0 0|9372k 0 |
487k 49M| 0 0 |9935 5599
18 1 81 0 0 0| 0 125k|
47k 2027k| 0 0 |8820 5358
4 1 95 0 0 0| 28M 450k|
289k 23M| 0 0 | 16k 7992
0 0 99 0 0 0| 10M
0 | 446k 42M| 0 0 |3765 5262
0 0 100 0 0 0|1944k 0 |8540B
1364k| 0 0 |1943 4378
0 0 99 0 0 0| 0 426k|
11k 2469B| 0 0 |2008 4476
1 0 99 0 0 0|1852k 368k|
16k 1687B| 0 0 |2111 4509
But the sql's result size is only 3KB, the network IO confused me. There are
also something confused me too, I found *half of the task's `Input
Size/Records` is `64.0 MB (hadoop) / 0`. * I think Spark dispatch jobs
in a wrong way.
I split the `test` table into different count of part files to valid my
thought. The data is:
part files, single file length,query els,network IO
1, 3.34G, 12s, ~200MB/node
5, 800MB, 10s,~100MB/node
64, 74MB, 8s, ~50MB/node
74, 64MB, 7s, only a little
150, 32MB, 6s, only a little
297,20MB, 5s, only a little
I also set the replication of `test` table to 5, I found no network IO
appeared, but the query cost 11s! So, I think the network IO should be spark
fetching data from a remote node when a parquet file large than the block
size of Hadoop (64MB).
I want to know:
1. How spark dispatch jobs when a parquet file only have one part file which
large than the block size of Hadoop?
2. In which condition task's `Input Size/Records` is `64.0 MB (hadoop) / 0`
or `5.2 KB (hadoop) / 0` or `0.0 B (hadoop) / 50`?
Any ideas?
Thanks
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SparkSQL-How-does-spark-handle-a-parquet-file-in-parallel-tp14210.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.