You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by 163 <he...@163.com> on 2017/07/14 09:43:51 UTC

How to tune the performance of Tpch query5 within Spark

> 
> I modify the tech query5 to DataFrame:
> val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders>”).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
> val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem>")
> val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer>")
> val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier>")
> val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region>”).where("r_name = 'ASIA'").select($"r_regionkey")
> val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation>”)
> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>      .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>      .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
>      .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>      .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>      .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
>      .groupBy($"n_name")
>      .agg(sum($"value").as("revenue"))
>      .sort($"revenue".desc).show()
> 
> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G, stored data on hdfs with parquet format.
> It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
>  I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
> And is there any advices for me to tuning this performance?
> Thank you.
Wenting He.